carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [07/16] carbondata git commit: [CARBONDATA-1597] Remove spark1 integration
Date Tue, 24 Oct 2017 09:41:20 GMT
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
deleted file mode 100644
index 6eeeaf9..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ /dev/null
@@ -1,1088 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import java.text.SimpleDateFormat
-import java.util
-import java.util.concurrent._
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
-import scala.util.Random
-import scala.util.control.Breaks._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.mapreduce.Job
-import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
-import org.apache.spark.{SparkEnv, SparkException, TaskContext}
-import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, NewHadoopRDD, RDD}
-import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext}
-import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionModel, ExecutionErrors, UpdateTableModel}
-import org.apache.spark.sql.hive.DistributionUtil
-import org.apache.spark.util.SparkUtil
-
-import org.apache.carbondata.common.constants.LoggerAction
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
-import org.apache.carbondata.core.dictionary.server.DictionaryServer
-import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion}
-import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
-import org.apache.carbondata.core.metadata.schema.partition.PartitionType
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.scan.partition.PartitionUtil
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
-import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties}
-import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.exception.DataLoadingException
-import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, FailureCauses}
-import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, StringArrayWritable}
-import org.apache.carbondata.processing.loading.exception.{CarbonDataLoadingException, NoRetryException}
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.processing.loading.sort.SortScopeOptions
-import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
-import org.apache.carbondata.processing.splits.TableSplit
-import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil, CarbonQueryUtil}
-import org.apache.carbondata.spark._
-import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
-
-/**
- * This is the factory class which can create different RDD depends on user needs.
- *
- */
-object CarbonDataRDDFactory {
-
-  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  def alterTableForCompaction(sqlContext: SQLContext,
-      alterTableModel: AlterTableModel,
-      carbonLoadModel: CarbonLoadModel,
-      storePath: String,
-      storeLocation: String): Unit = {
-    var compactionSize: Long = 0
-    var compactionType: CompactionType = CompactionType.MINOR_COMPACTION
-    if (alterTableModel.compactionType.equalsIgnoreCase("major")) {
-      compactionSize = CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR_COMPACTION)
-      compactionType = CompactionType.MAJOR_COMPACTION
-    } else if (alterTableModel.compactionType
-      .equalsIgnoreCase(CompactionType.IUD_UPDDEL_DELTA_COMPACTION.toString)) {
-      compactionType = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
-      if (alterTableModel.segmentUpdateStatusManager.get != None) {
-        carbonLoadModel
-          .setSegmentUpdateStatusManager((alterTableModel.segmentUpdateStatusManager.get))
-        carbonLoadModel
-          .setLoadMetadataDetails(alterTableModel.segmentUpdateStatusManager.get
-            .getLoadMetadataDetails.toList.asJava)
-      }
-    }
-    else {
-      compactionType = CompactionType.MINOR_COMPACTION
-    }
-
-    LOGGER.audit(s"Compaction request received for table " +
-        s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-
-    if (null == carbonLoadModel.getLoadMetadataDetails) {
-      CommonUtil.readLoadMetadataDetails(carbonLoadModel)
-    }
-    // reading the start time of data load.
-    val loadStartTime = CarbonUpdateUtil.readCurrentTime();
-    carbonLoadModel.setFactTimeStamp(loadStartTime)
-
-    val isCompactionTriggerByDDl = true
-    val compactionModel = CompactionModel(compactionSize,
-      compactionType,
-      carbonTable,
-      isCompactionTriggerByDDl
-    )
-
-    val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
-          CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
-        )
-        .equalsIgnoreCase("true")
-
-    // if system level compaction is enabled then only one compaction can run in the system
-    // if any other request comes at this time then it will create a compaction request file.
-    // so that this will be taken up by the compaction process which is executing.
-    if (!isConcurrentCompactionAllowed) {
-      LOGGER.info("System level compaction lock is enabled.")
-      handleCompactionForSystemLocking(sqlContext,
-        carbonLoadModel,
-        storePath,
-        storeLocation,
-        compactionType,
-        carbonTable,
-        compactionModel
-      )
-    } else {
-      // normal flow of compaction
-      val lock = CarbonLockFactory
-          .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-            LockUsage.COMPACTION_LOCK
-          )
-
-      if (lock.lockWithRetries()) {
-        LOGGER.info("Acquired the compaction lock for table" +
-            s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-        try {
-          startCompactionThreads(sqlContext,
-            carbonLoadModel,
-            storePath,
-            storeLocation,
-            compactionModel,
-            lock
-          )
-        } catch {
-          case e: Exception =>
-            LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }")
-            lock.unlock()
-        }
-      } else {
-        LOGGER.audit("Not able to acquire the compaction lock for table " +
-            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-        LOGGER.error(s"Not able to acquire the compaction lock for table" +
-            s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-        sys.error("Table is already locked for compaction. Please try after some time.")
-      }
-    }
-  }
-
-  def handleCompactionForSystemLocking(sqlContext: SQLContext,
-      carbonLoadModel: CarbonLoadModel,
-      storePath: String,
-      storeLocation: String,
-      compactionType: CompactionType,
-      carbonTable: CarbonTable,
-      compactionModel: CompactionModel): Unit = {
-    val lock = CarbonLockFactory
-        .getCarbonLockObj(CarbonCommonConstants.SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER,
-          LockUsage.SYSTEMLEVEL_COMPACTION_LOCK
-        )
-    if (lock.lockWithRetries()) {
-      LOGGER.info(s"Acquired the compaction lock for table ${ carbonLoadModel.getDatabaseName }" +
-          s".${ carbonLoadModel.getTableName }")
-      try {
-        startCompactionThreads(sqlContext,
-          carbonLoadModel,
-          storePath,
-          storeLocation,
-          compactionModel,
-          lock
-        )
-      } catch {
-        case e: Exception =>
-          LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }")
-          lock.unlock()
-          // if the compaction is a blocking call then only need to throw the exception.
-          if (compactionModel.isDDLTrigger) {
-            throw e
-          }
-      }
-    } else {
-      LOGGER.audit("Not able to acquire the system level compaction lock for table " +
-          s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-      LOGGER.error("Not able to acquire the compaction lock for table " +
-          s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-      CarbonCompactionUtil
-          .createCompactionRequiredFile(carbonTable.getMetaDataFilepath, compactionType)
-      // do sys error only in case of DDL trigger.
-      if (compactionModel.isDDLTrigger) {
-        sys.error("Compaction is in progress, compaction request for table " +
-            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
-            " is in queue.")
-      } else {
-        LOGGER.error("Compaction is in progress, compaction request for table " +
-            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
-            " is in queue.")
-      }
-    }
-  }
-
-  def startCompactionThreads(sqlContext: SQLContext,
-      carbonLoadModel: CarbonLoadModel,
-      storePath: String,
-      storeLocation: String,
-      compactionModel: CompactionModel,
-      compactionLock: ICarbonLock): Unit = {
-    val executor: ExecutorService = Executors.newFixedThreadPool(1)
-    // update the updated table status.
-    CommonUtil.readLoadMetadataDetails(carbonLoadModel)
-    val compactionThread = new Thread {
-      override def run(): Unit = {
-
-        try {
-          // compaction status of the table which is triggered by the user.
-          var triggeredCompactionStatus = false
-          var exception: Exception = null
-          try {
-            DataManagementFunc.executeCompaction(carbonLoadModel: CarbonLoadModel,
-              compactionModel: CompactionModel,
-              executor, sqlContext, storeLocation
-            )
-            triggeredCompactionStatus = true
-          } catch {
-            case e: Exception =>
-              LOGGER.error(s"Exception in compaction thread ${ e.getMessage }")
-              exception = e
-          }
-          // continue in case of exception also, check for all the tables.
-          val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
-              .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
-                CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
-              ).equalsIgnoreCase("true")
-
-          if (!isConcurrentCompactionAllowed) {
-            LOGGER.info("System level compaction lock is enabled.")
-            val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
-            var table: CarbonTable = CarbonCompactionUtil
-                .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata.
-                  tablesMeta.map(_.carbonTable).toArray,
-                  skipCompactionTables.toList.asJava)
-            while (null != table) {
-              LOGGER.info("Compaction request has been identified for table " +
-                  s"${ table.getDatabaseName }." +
-                  s"${ table.getFactTableName }")
-              val metadataPath = table.getMetaDataFilepath
-              val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
-
-              val newCarbonLoadModel = new CarbonLoadModel()
-              DataManagementFunc.prepareCarbonLoadModel(table, newCarbonLoadModel)
-
-              val compactionSize = CarbonDataMergerUtil
-                  .getCompactionSize(CompactionType.MAJOR_COMPACTION)
-
-              val newcompactionModel = CompactionModel(compactionSize,
-                compactionType,
-                table,
-                compactionModel.isDDLTrigger
-              )
-              // proceed for compaction
-              try {
-                DataManagementFunc.executeCompaction(newCarbonLoadModel,
-                  newcompactionModel,
-                  executor, sqlContext, storeLocation
-                )
-              } catch {
-                case e: Exception =>
-                  LOGGER.error("Exception in compaction thread for table " +
-                      s"${ table.getDatabaseName }." +
-                      s"${ table.getFactTableName }")
-                // not handling the exception. only logging as this is not the table triggered
-                // by user.
-              } finally {
-                // delete the compaction required file in case of failure or success also.
-                if (!CarbonCompactionUtil
-                    .deleteCompactionRequiredFile(metadataPath, compactionType)) {
-                  // if the compaction request file is not been able to delete then
-                  // add those tables details to the skip list so that it wont be considered next.
-                  skipCompactionTables.+=:(table.getCarbonTableIdentifier)
-                  LOGGER.error("Compaction request file can not be deleted for table " +
-                      s"${ table.getDatabaseName }." +
-                      s"${ table.getFactTableName }")
-                }
-              }
-              // ********* check again for all the tables.
-              table = CarbonCompactionUtil
-                  .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata
-                      .tablesMeta.map(_.carbonTable).toArray, skipCompactionTables.asJava
-                  )
-            }
-            // giving the user his error for telling in the beeline if his triggered table
-            // compaction is failed.
-            if (!triggeredCompactionStatus) {
-              throw new Exception("Exception in compaction " + exception.getMessage)
-            }
-          }
-        } finally {
-          executor.shutdownNow()
-          DataManagementFunc.deletePartialLoadsInCompaction(carbonLoadModel)
-          compactionLock.unlock()
-        }
-      }
-    }
-    // calling the run method of a thread to make the call as blocking call.
-    // in the future we may make this as concurrent.
-    compactionThread.run()
-  }
-
-  def loadCarbonData(sqlContext: SQLContext,
-      carbonLoadModel: CarbonLoadModel,
-      storePath: String,
-      columnar: Boolean,
-      partitionStatus: String = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS,
-      result: Option[DictionaryServer],
-      overwriteTable: Boolean,
-      dataFrame: Option[DataFrame] = None,
-      updateModel: Option[UpdateTableModel] = None): Unit = {
-    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-    val isAgg = false
-    // for handling of the segment Merging.
-    def handleSegmentMerging(): Unit = {
-      LOGGER.info(s"compaction need status is" +
-          s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }")
-      if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired()) {
-        LOGGER.audit(s"Compaction request received for table " +
-            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-        val compactionSize = 0
-        val isCompactionTriggerByDDl = false
-        val compactionModel = CompactionModel(compactionSize,
-          CompactionType.MINOR_COMPACTION,
-          carbonTable,
-          isCompactionTriggerByDDl
-        )
-        var storeLocation = ""
-        val configuredStore = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
-        if (null != configuredStore && configuredStore.nonEmpty) {
-          storeLocation = configuredStore(Random.nextInt(configuredStore.length))
-        }
-        if (storeLocation == null) {
-          storeLocation = System.getProperty("java.io.tmpdir")
-        }
-        storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
-
-        val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
-            CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
-          )
-          .equalsIgnoreCase("true")
-
-        if (!isConcurrentCompactionAllowed) {
-
-          handleCompactionForSystemLocking(sqlContext,
-            carbonLoadModel,
-            storePath,
-            storeLocation,
-            CompactionType.MINOR_COMPACTION,
-            carbonTable,
-            compactionModel
-          )
-        } else {
-          val lock = CarbonLockFactory
-              .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-                LockUsage.COMPACTION_LOCK
-              )
-
-          if (lock.lockWithRetries()) {
-            LOGGER.info("Acquired the compaction lock.")
-            try {
-              startCompactionThreads(sqlContext,
-                carbonLoadModel,
-                storePath,
-                storeLocation,
-                compactionModel,
-                lock
-              )
-            } catch {
-              case e: Exception =>
-                LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }")
-                lock.unlock()
-                throw e
-            }
-          } else {
-            LOGGER.audit("Not able to acquire the compaction lock for table " +
-                s"${ carbonLoadModel.getDatabaseName }.${
-                  carbonLoadModel
-                      .getTableName
-                }")
-            LOGGER.error("Not able to acquire the compaction lock for table " +
-                s"${ carbonLoadModel.getDatabaseName }.${
-                  carbonLoadModel
-                      .getTableName
-                }")
-          }
-        }
-      }
-    }
-
-    def updateStatus(loadStatus: String,
-        stat: Array[(String, (LoadMetadataDetails, ExecutionErrors))]) = {
-      val metadataDetails = if (stat != null && stat(0) != null) {
-        stat(0)._2._1
-      } else {
-        new LoadMetadataDetails
-      }
-      CarbonLoaderUtil
-        .populateNewLoadMetaEntry(metadataDetails,
-          loadStatus,
-          carbonLoadModel.getFactTimeStamp,
-          true)
-      val status = CarbonLoaderUtil.recordLoadMetadata(metadataDetails,
-        carbonLoadModel, false, overwriteTable)
-      if (!status) {
-        val errorMessage = "Dataload failed due to failure in table status updation."
-        LOGGER.audit("Data load is failed for " +
-                     s"${ carbonLoadModel.getDatabaseName }.${
-                       carbonLoadModel
-                         .getTableName
-                     }")
-        LOGGER.error("Dataload failed due to failure in table status updation.")
-        throw new Exception(errorMessage)
-      }
-    }
-
-    try {
-      LOGGER.audit(s"Data load request has been received for table" +
-          s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-      // Check if any load need to be deleted before loading new data
-      DataManagementFunc.deleteLoadsAndUpdateMetadata(carbonLoadModel.getDatabaseName,
-        carbonLoadModel.getTableName, storePath, false, carbonTable)
-      // get partition way from configuration
-      // val isTableSplitPartition = CarbonProperties.getInstance().getProperty(
-      // CarbonCommonConstants.TABLE_SPLIT_PARTITION,
-      // CarbonCommonConstants.TABLE_SPLIT_PARTITION_DEFAULT_VALUE).toBoolean
-      val isTableSplitPartition = false
-      var blocksGroupBy: Array[(String, Array[BlockDetails])] = null
-      var status: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null
-      var res: Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = null
-
-      def loadDataFile(): Unit = {
-        if (isTableSplitPartition) {
-          /*
-         * when data handle by table split partition
-         * 1) get partition files, direct load or not will get the different files path
-         * 2) get files blocks by using SplitUtils
-         * 3) output Array[(partitionID,Array[BlockDetails])] to blocksGroupBy
-         */
-          var splits = Array[TableSplit]()
-          if (carbonLoadModel.isDirectLoad) {
-            // get all table Splits, this part means files were divide to different partitions
-            splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath)
-            // get all partition blocks from file list
-            blocksGroupBy = splits.map {
-              split =>
-                val pathBuilder = new StringBuilder()
-                for (path <- split.getPartition.getFilesPath.asScala) {
-                  pathBuilder.append(path).append(",")
-                }
-                if (pathBuilder.nonEmpty) {
-                  pathBuilder.substring(0, pathBuilder.size - 1)
-                }
-                (split.getPartition.getUniqueID, SparkUtil.getSplits(pathBuilder.toString(),
-                  sqlContext.sparkContext
-                ))
-            }
-          } else {
-            // get all table Splits,when come to this, means data have been partition
-            splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
-              carbonLoadModel.getTableName, null)
-            // get all partition blocks from factFilePath/uniqueID/
-            blocksGroupBy = splits.map {
-              split =>
-                val pathBuilder = new StringBuilder()
-                pathBuilder.append(carbonLoadModel.getFactFilePath)
-                if (!carbonLoadModel.getFactFilePath.endsWith("/")
-                    && !carbonLoadModel.getFactFilePath.endsWith("\\")) {
-                  pathBuilder.append("/")
-                }
-                pathBuilder.append(split.getPartition.getUniqueID).append("/")
-                (split.getPartition.getUniqueID,
-                  SparkUtil.getSplits(pathBuilder.toString, sqlContext.sparkContext))
-            }
-          }
-        } else {
-          /*
-           * when data load handle by node partition
-           * 1)clone the hadoop configuration,and set the file path to the configuration
-           * 2)use org.apache.hadoop.mapreduce.lib.input.TextInputFormat to get splits,size info
-           * 3)use CarbonLoaderUtil.nodeBlockMapping to get mapping info of node and block,
-           *   for locally writing carbondata files(one file one block) in nodes
-           *   use NewCarbonDataLoadRDD to load data and write to carbondata files
-           */
-          val hadoopConfiguration = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
-          // FileUtils will skip file which is no csv, and return all file path which split by ','
-          val filePaths = carbonLoadModel.getFactFilePath
-          hadoopConfiguration.set(FileInputFormat.INPUT_DIR, filePaths)
-          hadoopConfiguration.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true")
-          hadoopConfiguration.set("io.compression.codecs",
-            """org.apache.hadoop.io.compress.GzipCodec,
-               org.apache.hadoop.io.compress.DefaultCodec,
-               org.apache.hadoop.io.compress.BZip2Codec""".stripMargin)
-
-          CommonUtil.configSplitMaxSize(sqlContext.sparkContext, filePaths, hadoopConfiguration)
-
-          val inputFormat = new org.apache.hadoop.mapreduce.lib.input.TextInputFormat
-          val jobContext = new Job(hadoopConfiguration)
-          val rawSplits = inputFormat.getSplits(jobContext).toArray
-          val blockList = rawSplits.map { inputSplit =>
-            val fileSplit = inputSplit.asInstanceOf[FileSplit]
-            new TableBlockInfo(fileSplit.getPath.toString,
-              fileSplit.getStart, "1",
-              fileSplit.getLocations, fileSplit.getLength, ColumnarFormatVersion.V1, null
-            ).asInstanceOf[Distributable]
-          }
-          // group blocks to nodes, tasks
-          val startTime = System.currentTimeMillis
-          val activeNodes = DistributionUtil
-              .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext)
-          val nodeBlockMapping =
-            CarbonLoaderUtil
-                .nodeBlockMapping(blockList.toSeq.asJava, -1, activeNodes.toList.asJava).asScala
-                .toSeq
-          val timeElapsed: Long = System.currentTimeMillis - startTime
-          LOGGER.info("Total Time taken in block allocation: " + timeElapsed)
-          LOGGER.info(s"Total no of blocks: ${ blockList.length }, " +
-              s"No.of Nodes: ${nodeBlockMapping.size}")
-          var str = ""
-          nodeBlockMapping.foreach(entry => {
-            val tableBlock = entry._2
-            str = str + "#Node: " + entry._1 + " no.of.blocks: " + tableBlock.size()
-            tableBlock.asScala.foreach(tableBlockInfo =>
-              if (!tableBlockInfo.getLocations.exists(hostentry =>
-                hostentry.equalsIgnoreCase(entry._1)
-              )) {
-                str = str + " , mismatch locations: " + tableBlockInfo.getLocations
-                    .foldLeft("")((a, b) => a + "," + b)
-              }
-            )
-            str = str + "\n"
-          }
-          )
-          LOGGER.info(str)
-          blocksGroupBy = nodeBlockMapping.map(entry => {
-            val blockDetailsList =
-              entry._2.asScala.map(distributable => {
-                val tableBlock = distributable.asInstanceOf[TableBlockInfo]
-                new BlockDetails(new Path(tableBlock.getFilePath),
-                  tableBlock.getBlockOffset, tableBlock.getBlockLength, tableBlock.getLocations
-                )
-              }).toArray
-            (entry._1, blockDetailsList)
-          }
-          ).toArray
-        }
-
-        status = new NewCarbonDataLoadRDD(sqlContext.sparkContext,
-          new DataLoadResultImpl(),
-          carbonLoadModel,
-          blocksGroupBy,
-          isTableSplitPartition).collect()
-      }
-
-      def loadDataFrame(): Unit = {
-        try {
-          val rdd = dataFrame.get.rdd
-          val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]] { p =>
-            DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
-          }.distinct.size
-          val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData,
-            sqlContext.sparkContext)
-          val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct)
-          var numPartitions = DistributionUtil.getNodeList(sqlContext.sparkContext).length
-          numPartitions = Math.max(1, Math.min(numPartitions, rdd.partitions.length))
-          val coalesceRdd = rdd.coalesce(numPartitions, shuffle = false)
-
-          status = new NewDataFrameLoaderRDD(sqlContext.sparkContext,
-            new DataLoadResultImpl(),
-            carbonLoadModel,
-            newRdd).collect()
-        } catch {
-          case ex: Exception =>
-            LOGGER.error(ex, "load data frame failed")
-            throw ex
-        }
-      }
-
-      def loadDataFrameForUpdate(): Unit = {
-        val segmentUpdateParallelism = CarbonProperties.getInstance().getParallelismForSegmentUpdate
-
-        def triggerDataLoadForSegment(key: String, taskNo: Int,
-            iter: Iterator[Row]): Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] = {
-          val rddResult = new updateResultImpl()
-          val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-          val resultIter = new Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] {
-            var partitionID = "0"
-            val loadMetadataDetails = new LoadMetadataDetails
-            val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
-            var uniqueLoadStatusId = ""
-            try {
-              val segId = key
-              val index = taskNo
-              uniqueLoadStatusId = carbonLoadModel.getTableName +
-                                   CarbonCommonConstants.UNDERSCORE +
-                                   (index + "_0")
-
-              // convert timestamp
-              val timeStampInLong = updateModel.get.updatedTimeStamp + ""
-              loadMetadataDetails.setPartitionCount(partitionID)
-              loadMetadataDetails.setLoadName(segId)
-              loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
-              carbonLoadModel.setPartitionId(partitionID)
-              carbonLoadModel.setSegmentId(segId)
-              carbonLoadModel.setTaskNo(String.valueOf(index))
-              carbonLoadModel.setFactTimeStamp(updateModel.get.updatedTimeStamp)
-
-              // During Block Spill case Increment of File Count and proper adjustment of Block
-              // naming is only done when AbstractFactDataWriter.java : initializeWriter get
-              // CarbondataFileName as null. For handling Block Spill not setting the
-              // CarbondataFileName in case of Update.
-              // carbonLoadModel.setCarbondataFileName(newBlockName)
-
-              // storeLocation = CarbonDataLoadRDD.initialize(carbonLoadModel, index)
-              loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
-              UpdateDataLoad.DataLoadForUpdate(segId,
-                index,
-                iter,
-                carbonLoadModel,
-                loadMetadataDetails)
-            } catch {
-              case e: NoRetryException =>
-                loadMetadataDetails
-                  .setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
-                executionErrors.failureCauses = FailureCauses.BAD_RECORDS
-                executionErrors.errorMsg = e.getMessage
-                LOGGER.info("Bad Record Found")
-              case e: Exception =>
-                LOGGER.info("DataLoad failure")
-                LOGGER.error(e)
-                throw e
-            }
-
-            var finished = false
-
-            override def hasNext: Boolean = !finished
-
-            override def next(): (String, (LoadMetadataDetails, ExecutionErrors)) = {
-              finished = true
-              rddResult
-                .getKey(uniqueLoadStatusId,
-                  (loadMetadataDetails, executionErrors))
-            }
-          }
-          resultIter
-        }
-
-        val updateRdd = dataFrame.get.rdd
-
-        // return directly if no rows to update
-        val noRowsToUpdate = updateRdd.isEmpty()
-        if (noRowsToUpdate) {
-          res = Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]]()
-          return
-        }
-
-        // splitting as (key, value) i.e., (segment, updatedRows)
-        val keyRDD = updateRdd.map(row =>
-          (row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 1): _*)))
-
-        val loadMetadataDetails = SegmentStatusManager.readLoadMetadata(
-          carbonTable.getMetaDataFilepath)
-        val segmentIds = loadMetadataDetails.map(_.getLoadName)
-        val segmentIdIndex = segmentIds.zipWithIndex.toMap
-        val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonLoadModel.getStorePath,
-          carbonTable.getCarbonTableIdentifier)
-        val segmentId2maxTaskNo = segmentIds.map { segId =>
-          (segId, CarbonUpdateUtil.getLatestTaskIdForSegment(segId, carbonTablePath))
-        }.toMap
-
-        class SegmentPartitioner(segIdIndex: Map[String, Int], parallelism: Int)
-          extends org.apache.spark.Partitioner {
-          override def numPartitions: Int = segmentIdIndex.size * parallelism
-
-          override def getPartition(key: Any): Int = {
-            val segId = key.asInstanceOf[String]
-            // partitionId
-            segmentIdIndex(segId) * parallelism + Random.nextInt(parallelism)
-          }
-        }
-
-        val partitionByRdd = keyRDD.partitionBy(new SegmentPartitioner(segmentIdIndex,
-          segmentUpdateParallelism))
-
-        // because partitionId=segmentIdIndex*parallelism+RandomPart and RandomPart<parallelism,
-        // so segmentIdIndex=partitionId/parallelism, this has been verified.
-        res = partitionByRdd.map(_._2).mapPartitions { partition =>
-          val partitionId = TaskContext.getPartitionId()
-          val segIdIndex = partitionId / segmentUpdateParallelism
-          val randomPart = partitionId - segIdIndex * segmentUpdateParallelism
-          val segId = segmentIds(segIdIndex)
-          val newTaskNo = segmentId2maxTaskNo(segId) + randomPart + 1
-
-          List(triggerDataLoadForSegment(segId, newTaskNo, partition).toList).toIterator
-        }.collect()
-      }
-
-      def loadDataForPartitionTable(): Unit = {
-        try {
-          val rdd = repartitionInputData(sqlContext, dataFrame, carbonLoadModel)
-          status = new PartitionTableDataLoaderRDD(sqlContext.sparkContext,
-            new DataLoadResultImpl(),
-            carbonLoadModel,
-            rdd).collect()
-        } catch {
-          case ex: Exception =>
-            LOGGER.error(ex, "load data failed for partition table")
-            throw ex
-        }
-      }
-
-      // create new segment folder  in carbon store
-      if (!updateModel.isDefined) {
-        CarbonLoaderUtil.checkAndCreateCarbonDataLocation(storePath,
-          carbonLoadModel.getSegmentId, carbonTable)
-      }
-      var loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
-      var errorMessage: String = "DataLoad failure"
-      var executorMessage: String = ""
-      val configuration = DataLoadProcessBuilder.createConfiguration(carbonLoadModel)
-      val sortScope = CarbonDataProcessorUtil.getSortScope(configuration)
-      try {
-        if (updateModel.isDefined) {
-          loadDataFrameForUpdate()
-        } else if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) {
-          loadDataForPartitionTable()
-        } else if (configuration.isSortTable &&
-            sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
-          LOGGER.audit("Using global sort for loading.")
-          status = DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkContext,
-            dataFrame, carbonLoadModel)
-        } else if (dataFrame.isDefined) {
-          loadDataFrame()
-        } else {
-          loadDataFile()
-        }
-        if (updateModel.isDefined) {
-
-          res.foreach(resultOfSeg => resultOfSeg.foreach(
-            resultOfBlock => {
-              if (resultOfBlock._2._1.getLoadStatus
-                .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)) {
-                loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
-                if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE) {
-                  updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
-                  updateModel.get.executorErrors.errorMsg = "Failure in the Executor."
-                }
-                else {
-                  updateModel.get.executorErrors = resultOfBlock._2._2
-                }
-              } else if (resultOfBlock._2._1.getLoadStatus
-                .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)) {
-                loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
-                updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses
-                updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg
-              }
-            }
-          ))
-
-        }
-        else {
-          val newStatusMap = scala.collection.mutable.Map.empty[String, String]
-        if (status.nonEmpty) {
-          status.foreach { eachLoadStatus =>
-            val state = newStatusMap.get(eachLoadStatus._1)
-            state match {
-              case Some(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) =>
-                newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getLoadStatus)
-              case Some(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
-                if eachLoadStatus._2._1.getLoadStatus ==
-                    CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS =>
-                newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getLoadStatus)
-              case _ =>
-                newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getLoadStatus)
-            }
-          }
-
-          newStatusMap.foreach {
-            case (key, value) =>
-              if (value == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
-                loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
-              } else if (value == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
-                  !loadStatus.equals(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)) {
-                loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
-              }
-          }
-        } else {
-          loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
-        }
-
-        if (loadStatus != CarbonCommonConstants.STORE_LOADSTATUS_FAILURE &&
-            partitionStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) {
-          loadStatus = partitionStatus
-        }
-      }
-      } catch {
-        case ex: Throwable =>
-          loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
-          ex match {
-            case sparkException: SparkException =>
-              if (sparkException.getCause.isInstanceOf[DataLoadingException] ||
-                  sparkException.getCause.isInstanceOf[CarbonDataLoadingException]) {
-                executorMessage = sparkException.getCause.getMessage
-                errorMessage = errorMessage + ": " + executorMessage
-              }
-            case _ =>
-              executorMessage = ex.getCause.getMessage
-              errorMessage = errorMessage + ": " + executorMessage
-          }
-          LOGGER.info(errorMessage)
-          LOGGER.error(ex)
-      }
-      // handle the status file updation for the update cmd.
-      if (updateModel.isDefined) {
-
-      if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
-      // updateModel.get.executorErrors.errorMsg = errorMessage
-          if (updateModel.get.executorErrors.failureCauses == FailureCauses.NONE) {
-            updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
-            if (null != executorMessage && !executorMessage.isEmpty) {
-              updateModel.get.executorErrors.errorMsg = executorMessage
-            } else {
-              updateModel.get.executorErrors.errorMsg = "Update failed as the data load has failed."
-            }
-          }
-          return
-        } else if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
-                   updateModel.get.executorErrors.failureCauses == FailureCauses.BAD_RECORDS &&
-                   carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) {
-          return
-        } else {
-          // in success case handle updation of the table status file.
-          // success case.
-          val segmentDetails = new util.HashSet[String]()
-
-          var resultSize = 0
-
-          res.foreach(resultOfSeg => {
-            resultSize = resultSize + resultOfSeg.size
-            resultOfSeg.foreach(
-            resultOfBlock => {
-              segmentDetails.add(resultOfBlock._2._1.getLoadName)
-            }
-          )}
-          )
-
-          // this means that the update doesnt have any records to update so no need to do table
-          // status file updation.
-          if (resultSize == 0) {
-            LOGGER.audit("Data update is successful with 0 rows updation for " +
-                         s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
-            return
-          }
-
-          if (
-            CarbonUpdateUtil
-              .updateTableMetadataStatus(segmentDetails,
-                carbonTable,
-                updateModel.get.updatedTimeStamp + "",
-                true,
-                new util.ArrayList[String](0))) {
-            LOGGER.audit("Data update is successful for " +
-                         s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-          }
-          else {
-            val errorMessage = "Data update failed due to failure in table status updation."
-            LOGGER.audit("Data update is failed for " +
-                         s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
-            LOGGER.error("Data update failed due to failure in table status updation.")
-            updateModel.get.executorErrors.errorMsg = errorMessage
-            updateModel.get.executorErrors.failureCauses = FailureCauses
-              .STATUS_FILE_UPDATION_FAILURE
-            return
-          }
-
-        }
-
-        return
-      }
-      if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
-        LOGGER.info("********starting clean up**********")
-        CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
-        LOGGER.info("********clean up done**********")
-        LOGGER.audit(s"Data load is failed for " +
-            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-        LOGGER.warn("Cannot write load metadata file as data load failed")
-        updateStatus(loadStatus, status)
-        throw new Exception(errorMessage)
-      } else {
-        // check if data load fails due to bad record and throw data load failure due to
-        // bad record exception
-        if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
-            status(0)._2._2.failureCauses == FailureCauses.BAD_RECORDS &&
-            carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) {
-          LOGGER.info("********starting clean up**********")
-          CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
-          LOGGER.info("********clean up done**********")
-          LOGGER.audit(s"Data load is failed for " +
-                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-          updateStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE, status)
-          throw new Exception(status(0)._2._2.errorMsg)
-        }
-        if (!isAgg) {
-            writeDictionary(carbonLoadModel, result)
-            updateStatus(loadStatus, status)
-        } else if (!carbonLoadModel.isRetentionRequest) {
-          // TODO : Handle it
-          LOGGER.info("********Database updated**********")
-        }
-
-        if (CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS.equals(loadStatus)) {
-          LOGGER.audit("Data load is partially successful for " +
-                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-        } else {
-          LOGGER.audit("Data load is successful for " +
-                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-        }
-        try {
-          // compaction handling
-          handleSegmentMerging()
-        } catch {
-          case e: Exception =>
-            throw new Exception(
-              "Dataload is success. Auto-Compaction has failed. Please check logs.")
-        }
-      }
-    }
-
-  }
-
-  /**
-   * repartition the input data for partiton table.
-   * @param sqlContext
-   * @param dataFrame
-   * @param carbonLoadModel
-   * @return
-   */
-  private def repartitionInputData(sqlContext: SQLContext,
-      dataFrame: Option[DataFrame],
-      carbonLoadModel: CarbonLoadModel): RDD[Row] = {
-    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
-    val partitionColumn = partitionInfo.getColumnSchemaList.get(0).getColumnName
-    val partitionColumnDataType = partitionInfo.getColumnSchemaList.get(0).getDataType
-    val columns = carbonLoadModel.getCsvHeaderColumns
-    var partitionColumnIndex = -1
-    breakable {
-      for (i <- 0 until columns.length) {
-        if (partitionColumn.equalsIgnoreCase(columns(i))) {
-          partitionColumnIndex = i
-          break
-        }
-      }
-    }
-    if (partitionColumnIndex == -1) {
-      throw new DataLoadingException("Partition column not found.")
-    }
-
-    val dateFormatMap = CarbonDataProcessorUtil.getDateFormatMap(carbonLoadModel.getDateFormat())
-    val specificFormat = Option(dateFormatMap.get(partitionColumn.toLowerCase))
-    val timeStampFormat = if (specificFormat.isDefined) {
-      new SimpleDateFormat(specificFormat.get)
-    } else {
-      val timestampFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
-        .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
-      new SimpleDateFormat(timestampFormatString)
-    }
-
-    val dateFormat = if (specificFormat.isDefined) {
-      new SimpleDateFormat(specificFormat.get)
-    } else {
-      val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
-        .CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
-      new SimpleDateFormat(dateFormatString)
-    }
-
-    // generate RDD[(K, V)] to use the partitionBy method of PairRDDFunctions
-    val inputRDD: RDD[(String, Row)] = if (dataFrame.isDefined) {
-      // input data from DataFrame
-      val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
-      val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
-      val serializationNullFormat =
-        carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
-      dataFrame.get.rdd.map { row =>
-        if (null != row && row.length > partitionColumnIndex &&
-          null != row.get(partitionColumnIndex)) {
-          (CarbonScalaUtil.getString(row.get(partitionColumnIndex), serializationNullFormat,
-            delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat), row)
-        } else {
-          (null, row)
-        }
-      }
-    } else {
-      // input data from csv files
-      val hadoopConfiguration = new Configuration()
-      CommonUtil.configureCSVInputFormat(hadoopConfiguration, carbonLoadModel)
-      hadoopConfiguration.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
-      val columnCount = columns.length
-      new NewHadoopRDD[NullWritable, StringArrayWritable](
-        sqlContext.sparkContext,
-        classOf[CSVInputFormat],
-        classOf[NullWritable],
-        classOf[StringArrayWritable],
-        hadoopConfiguration
-      ).map { currentRow =>
-        if (null == currentRow || null == currentRow._2) {
-          val row = new StringArrayRow(new Array[String](columnCount))
-          (null, row)
-        } else {
-          val row = new StringArrayRow(new Array[String](columnCount))
-          val values = currentRow._2.get()
-          if (values != null && values.length > partitionColumnIndex) {
-            (currentRow._2.get()(partitionColumnIndex), row.setValues(currentRow._2.get()))
-          } else {
-            (null, row.setValues(currentRow._2.get()))
-          }
-        }
-      }
-    }
-
-    val partitioner = PartitionFactory.getPartitioner(partitionInfo)
-    if (partitionColumnDataType == DataTypes.STRING) {
-      if (partitionInfo.getPartitionType == PartitionType.RANGE) {
-        inputRDD.map { row => (ByteUtil.toBytes(row._1), row._2) }
-          .partitionBy(partitioner)
-          .map(_._2)
-      } else {
-        inputRDD.partitionBy(partitioner)
-          .map(_._2)
-      }
-    } else {
-      inputRDD.map { row =>
-        (PartitionUtil.getDataBasedOnDataType(row._1, partitionColumnDataType, timeStampFormat,
-          dateFormat), row._2)
-      }
-        .partitionBy(partitioner)
-        .map(_._2)
-    }
-  }
-
-    private def writeDictionary(carbonLoadModel: CarbonLoadModel,
-        result: Option[DictionaryServer]) = {
-    // write dictionary file and shutdown dictionary server
-    val uniqueTableName: String = s"${ carbonLoadModel.getDatabaseName }_${
-      carbonLoadModel.getTableName }"
-    result match {
-      case Some(server) =>
-        try {
-          server.writeTableDictionary(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-            .getCarbonTableIdentifier.getTableId)
-        } catch {
-          case ex: Exception =>
-            LOGGER.error(s"Error while writing dictionary file for $uniqueTableName")
-            throw new Exception("Dataload failed due to error while writing dictionary file!")
-        }
-      case _ =>
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
deleted file mode 100644
index f8275d1..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.thriftserver
-
-import java.io.File
-
-import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.sql.CarbonContext
-import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.util.CarbonProperties
-
-object CarbonThriftServer {
-
-  def main(args: Array[String]): Unit = {
-    val conf = new SparkConf()
-      .setAppName("Carbon Thrift Server")
-    if (!conf.contains("carbon.properties.filepath")) {
-      val sparkHome = System.getenv.get("SPARK_HOME")
-      if (sparkHome != null) {
-        val file = new File(sparkHome + '/' + "conf" + '/' + "carbon.properties")
-        if (file.exists()) {
-          conf.set("carbon.properties.filepath", file.getCanonicalPath)
-          System.setProperty("carbon.properties.filepath", file.getCanonicalPath)
-        }
-      }
-    } else {
-      System.setProperty("carbon.properties.filepath", conf.get("carbon.properties.filepath"))
-    }
-    if (org.apache.spark.SPARK_VERSION.startsWith("1.6")) {
-      conf.set("spark.sql.hive.thriftServer.singleSession", "true")
-    }
-    val sc = new SparkContext(conf)
-    val warmUpTime = CarbonProperties.getInstance().getProperty("carbon.spark.warmUpTime", "5000")
-    try {
-      Thread.sleep(Integer.parseInt(warmUpTime))
-    } catch {
-      case e: Exception =>
-        val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-        LOG.error(s"Wrong value for carbon.spark.warmUpTime $warmUpTime " +
-                  "Using default Value and proceeding")
-        Thread.sleep(30000)
-    }
-    val storePath = if (args.length > 0) args.head else null
-    val cc = new CarbonContext(sc, storePath)
-
-    HiveThriftServer2.startWithContext(cc)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
deleted file mode 100644
index 118249a..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.util
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.hive.{CarbonMetaData, DictionaryMap}
-
-import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.CarbonUtil
-
-case class TransformHolder(rdd: Any, mataData: CarbonMetaData)
-
-object CarbonSparkUtil {
-
-  def createSparkMeta(carbonTable: CarbonTable): CarbonMetaData = {
-    val dimensionsAttr = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
-        .asScala.map(x => x.getColName) // wf : may be problem
-    val measureAttr = carbonTable.getMeasureByTableName(carbonTable.getFactTableName)
-        .asScala.map(x => x.getColName)
-    val dictionary =
-      carbonTable.getDimensionByTableName(carbonTable.getFactTableName).asScala.map { f =>
-        (f.getColName.toLowerCase,
-            f.hasEncoding(Encoding.DICTIONARY) && !f.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
-                !f.getDataType.isComplexType)
-      }
-    CarbonMetaData(dimensionsAttr, measureAttr, carbonTable, DictionaryMap(dictionary.toMap))
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
deleted file mode 100644
index 4950227..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.util
-
-import scala.reflect.ClassTag
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.Job
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
-
-
-/**
- * All the utility functions for carbon plan creation
- */
-object QueryPlanUtil {
-
-  /**
-   * createCarbonInputFormat from query model
-   */
-  def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier) :
-  (CarbonTableInputFormat[Array[Object]], Job) = {
-    val carbonInputFormat = new CarbonTableInputFormat[Array[Object]]()
-    val jobConf: JobConf = new JobConf(new Configuration)
-    val job: Job = new Job(jobConf)
-    FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
-    (carbonInputFormat, job)
-  }
-
-  def createCarbonInputFormat[V: ClassTag](absoluteTableIdentifier: AbsoluteTableIdentifier,
-      conf: Configuration) : CarbonTableInputFormat[V] = {
-    val carbonInputFormat = new CarbonTableInputFormat[V]()
-    val job: Job = new Job(conf)
-    FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
-    carbonInputFormat
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/CarbonInputMetrics.scala b/integration/spark/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
deleted file mode 100644
index ea75ccb..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark
-
-import java.lang.Long
-
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.executor.{DataReadMethod, InputMetrics}
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.hadoop.{CarbonMultiBlockSplit, InputMetricsStats}
-import org.apache.carbondata.spark.InitInputMetrics
-
-/**
- * It gives statistics of number of bytes and record read
- */
-class CarbonInputMetrics extends InitInputMetrics {
-  @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-  var inputMetrics: InputMetrics = _
-  var bytesReadCallback: Option[() => scala.Long] = _
-  var carbonMultiBlockSplit: CarbonMultiBlockSplit = _
-
-  def initBytesReadCallback(context: TaskContext,
-                            carbonMultiBlockSplit: CarbonMultiBlockSplit) {
-    inputMetrics = context.taskMetrics().getInputMetricsForReadMethod(DataReadMethod.Hadoop)
-    this.carbonMultiBlockSplit = carbonMultiBlockSplit;
-    bytesReadCallback = carbonMultiBlockSplit match {
-      case _: CarbonMultiBlockSplit =>
-        SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
-      case _ => None
-    }
-  }
-
-  def incrementRecordRead(recordRead: Long) {
-    inputMetrics.incRecordsRead(recordRead)
-  }
-
-  def updateAndClose() {
-    if (bytesReadCallback.isDefined) {
-      inputMetrics.updateBytesRead()
-    } else if (carbonMultiBlockSplit.isInstanceOf[CarbonMultiBlockSplit]) {
-      // If we can't get the bytes read from the FS stats, fall back to the split size,
-      // which may be inaccurate.
-      try {
-        inputMetrics.incBytesRead(carbonMultiBlockSplit.getLength)
-      } catch {
-        case e: java.io.IOException =>
-          LOGGER.warn("Unable to get input size to set InputMetrics for task:" + e.getMessage)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
deleted file mode 100644
index a6c28a9..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, ExprId, LeafExpression, NamedExpression}
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.DataType
-
-import org.apache.carbondata.core.scan.expression.ColumnExpression
-
-case class CastExpr(expr: Expression) extends Filter
-
-case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nullable: Boolean)
-  extends LeafExpression with NamedExpression with CodegenFallback {
-
-  type EvaluatedType = Any
-
-  override def toString: String = s"input[" + colExp.getColIndex + "]"
-
-  override def eval(input: InternalRow): Any = input.get(colExp.getColIndex, dataType)
-
-  override def name: String = colExp.getColumnName
-
-  override def toAttribute: Attribute = throw new UnsupportedOperationException
-
-  override def exprId: ExprId = throw new UnsupportedOperationException
-
-  override def qualifiers: Seq[String] = throw new UnsupportedOperationException
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
deleted file mode 100644
index 024c54b..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.{ UnaryNode, _ }
-import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.sql.optimizer.CarbonDecoderRelation
-import org.apache.spark.sql.types._
-
-import org.apache.carbondata.spark.CarbonAliasDecoderRelation
-import org.apache.carbondata.spark.util.CommonUtil
-
-/**
- * Top command
- */
-case class Top(count: Int, topOrBottom: Int, dim: NamedExpression, msr: NamedExpression,
-    child: LogicalPlan) extends UnaryNode {
-  def output: Seq[Attribute] = child.output
-
-  override def references: AttributeSet = {
-    val list = List(dim, msr)
-    AttributeSet(list.flatMap(_.references))
-  }
-}
-
-object getDB {
-
-  def getDatabaseName(dbName: Option[String], sqlContext: SQLContext): String = {
-    dbName.getOrElse(sqlContext.asInstanceOf[HiveContext].catalog.client.currentDatabase)
-  }
-
-}
-
-/**
- * Shows Loads in a table
- */
-case class ShowLoadsCommand(databaseNameOp: Option[String], table: String, limit: Option[String])
-  extends LogicalPlan with Command {
-
-  override def children: Seq[LogicalPlan] = Seq.empty
-
-  override def output: Seq[Attribute] = {
-    Seq(AttributeReference("SegmentSequenceId", StringType, nullable = false)(),
-      AttributeReference("Status", StringType, nullable = false)(),
-      AttributeReference("Load Start Time", TimestampType, nullable = false)(),
-      AttributeReference("Load End Time", TimestampType, nullable = false)())
-  }
-}
-
-/**
- * Describe formatted for hive table
- */
-case class DescribeFormattedCommand(sql: String, tblIdentifier: TableIdentifier)
-  extends LogicalPlan with Command {
-  override def children: Seq[LogicalPlan] = Seq.empty
-
-  override def output: Seq[AttributeReference] =
-    Seq(AttributeReference("result", StringType, nullable = false)())
-}
-
-case class CarbonDictionaryCatalystDecoder(
-    relations: Seq[CarbonDecoderRelation],
-    profile: CarbonProfile,
-    aliasMap: CarbonAliasDecoderRelation,
-    isOuter: Boolean,
-    child: LogicalPlan) extends UnaryNode {
-  override def output: Seq[Attribute] = child.output
-}
-
-abstract class CarbonProfile(attributes: Seq[Attribute]) extends Serializable {
-  def isEmpty: Boolean = attributes.isEmpty
-}
-
-case class IncludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attributes)
-
-case class ExcludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attributes)
-
-case class CreateDatabase(dbName: String, sql: String) extends LogicalPlan with Command {
-  override def children: Seq[LogicalPlan] = Seq.empty
-  override def output: Seq[AttributeReference] = {
-    Seq()
-  }
-}
-
-case class DropDatabase(dbName: String, isCascade: Boolean, sql: String)
-    extends LogicalPlan with Command {
-  override def children: Seq[LogicalPlan] = Seq.empty
-  override def output: Seq[AttributeReference] = {
-    Seq()
-  }
-}
-
-case class UseDatabase(sql: String) extends LogicalPlan with Command {
-  override def children: Seq[LogicalPlan] = Seq.empty
-  override def output: Seq[AttributeReference] = {
-    Seq()
-  }
-}
-
-case class ProjectForUpdate(
-    table: UnresolvedRelation,
-    columns: List[String],
-    children: Seq[LogicalPlan] ) extends LogicalPlan with Command {
-  override def output: Seq[AttributeReference] = Seq.empty
-}
-
-case class UpdateTable(
-    table: UnresolvedRelation,
-    columns: List[String],
-    selectStmt: String,
-    filer: String) extends LogicalPlan {
-  override def children: Seq[LogicalPlan] = Seq.empty
-  override def output: Seq[AttributeReference] = Seq.empty
-}
-
-case class DeleteRecords(
-    statement: String,
-    table: UnresolvedRelation) extends LogicalPlan {
-  override def children: Seq[LogicalPlan] = Seq.empty
-  override def output: Seq[AttributeReference] = Seq.empty
-}
-
-case class ShowPartitions(
-    table: TableIdentifier) extends LogicalPlan {
-  override def children: Seq[LogicalPlan] = Seq.empty
-  override def output: Seq[Attribute] = CommonUtil.partitionInfoOutput
-}
-
-/**
- * A logical plan representing insertion into Hive table.
- * This plan ignores nullability of ArrayType, MapType, StructType unlike InsertIntoTable
- * because Hive table doesn't have nullability for ARRAY, MAP, STRUCT types.
- */
-case class InsertIntoCarbonTable(
-    table: CarbonDatasourceRelation,
-    partition: Map[String, Option[String]],
-    child: LogicalPlan,
-    overwrite: Boolean,
-    ifNotExists: Boolean)
-  extends LogicalPlan with Command {
-
-  override def children: Seq[LogicalPlan] = child :: Nil
-  override def output: Seq[Attribute] = Seq.empty
-
-  // This is the expected schema of the table prepared to be inserted into,
-  // including dynamic partition columns.
-  val tableOutput = table.carbonRelation.output
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
deleted file mode 100644
index da4b210..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import java.io.File
-
-import scala.language.implicitConversions
-
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.catalyst.ParserDialect
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog}
-import org.apache.spark.sql.catalyst.optimizer.Optimizer
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.ExtractPythonUDFs
-import org.apache.spark.sql.execution.datasources.{PreInsertCastAndRename, PreWriteCheck}
-import org.apache.spark.sql.hive._
-import org.apache.spark.sql.optimizer.CarbonOptimizer
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
-
-class CarbonContext(
-    val sc: SparkContext,
-    val storePath: String,
-    metaStorePath: String) extends HiveContext(sc) {
-  self =>
-
-  def this(sc: SparkContext) = {
-    this(sc,
-      null,
-      new File(CarbonCommonConstants.METASTORE_LOCATION_DEFAULT_VAL).getCanonicalPath)
-  }
-
-  def this(sc: SparkContext, storePath: String) = {
-    this(sc,
-      storePath,
-      new File(CarbonCommonConstants.METASTORE_LOCATION_DEFAULT_VAL).getCanonicalPath)
-  }
-
-  CarbonContext.addInstance(sc, this)
-  CodeGenerateFactory.init(sc.version)
-  udf.register("getTupleId", () => "")
-  CarbonEnv.init(this)
-
-  var lastSchemaUpdatedTime = System.currentTimeMillis()
-  val hiveClientInterface = metadataHive
-
-  protected[sql] override lazy val conf: SQLConf = new CarbonSQLConf
-
-  @transient
-  override lazy val catalog = {
-    val carbonProperties = CarbonProperties.getInstance()
-    if (storePath != null) {
-      carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
-      // In case if it is in carbon.properties for backward compatible
-    } else if (carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION) == null) {
-      carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION,
-        conf.getConfString("spark.sql.warehouse.dir"))
-    }
-    new CarbonMetastore(this, storePath, metadataHive, queryId) with OverrideCatalog
-  }
-
-  @transient
-  override protected[sql] lazy val analyzer =
-    new Analyzer(catalog, functionRegistry, conf) {
-
-      override val extendedResolutionRules =
-        catalog.ParquetConversions ::
-        catalog.CreateTables ::
-        CarbonIUDAnalysisRule ::
-        CarbonPreInsertionCasts ::
-        ExtractPythonUDFs ::
-        ResolveHiveWindowFunction ::
-        PreInsertCastAndRename ::
-        Nil
-
-      override val extendedCheckRules = Seq(
-        PreWriteCheck(catalog)
-      )
-    }
-
-  @transient
-  override protected[sql] lazy val optimizer: Optimizer =
-    CarbonOptimizer.optimizer(
-      CodeGenerateFactory.createDefaultOptimizer(conf, sc),
-      conf.asInstanceOf[CarbonSQLConf],
-      sc.version)
-
-  protected[sql] override def getSQLDialect(): ParserDialect = new CarbonSQLDialect(this)
-
-  experimental.extraStrategies = {
-    val carbonStrategy = new CarbonStrategies(self)
-    Seq(carbonStrategy.CarbonTableScan, carbonStrategy.DDLStrategies)
-  }
-
-  override protected def configure(): Map[String, String] = {
-    sc.hadoopConfiguration.addResource("hive-site.xml")
-    if (sc.hadoopConfiguration.get(CarbonCommonConstants.HIVE_CONNECTION_URL) == null) {
-      val metaStorePathAbsolute = new File(metaStorePath).getCanonicalPath
-      val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db"
-      logDebug(s"metastore db is going to be created in location: $hiveMetaStoreDB")
-      super.configure() ++ Map[String, String]((CarbonCommonConstants.HIVE_CONNECTION_URL,
-        s"jdbc:derby:;databaseName=$hiveMetaStoreDB;create=true"),
-        ("hive.metastore.warehouse.dir", metaStorePathAbsolute + "/hivemetadata"))
-    } else {
-      super.configure()
-    }
-  }
-
-  @transient
-  private val LOGGER = LogServiceFactory.getLogService(CarbonContext.getClass.getName)
-
-  var queryId: String = ""
-
-  override def sql(sql: String): DataFrame = {
-    // queryId will be unique for each query, creting query detail holder
-    queryId = System.nanoTime() + ""
-    this.setConf("queryId", queryId)
-
-    CarbonContext.updateCarbonPorpertiesPath(this)
-    val sqlString = sql.toUpperCase
-    LOGGER.info(s"Query [$sqlString]")
-    val recorder = CarbonTimeStatisticsFactory.createDriverRecorder()
-    val statistic = new QueryStatistic()
-    val logicPlan: LogicalPlan = parseSql(sql)
-    statistic.addStatistics(QueryStatisticsConstants.SQL_PARSE, System.currentTimeMillis())
-    recorder.recordStatisticsForDriver(statistic, queryId)
-    val result = new DataFrame(this, logicPlan)
-
-    // We force query optimization to happen right away instead of letting it happen lazily like
-    // when using the query DSL.  This is so DDL commands behave as expected.  This is only
-    // generates the RDD lineage for DML queries, but do not perform any execution.
-    result
-  }
-
-}
-
-object CarbonContext {
-
-  val datasourceName: String = "org.apache.carbondata.format"
-
-  val datasourceShortName: String = "carbondata"
-
-  @transient
-  private val LOGGER = LogServiceFactory.getLogService(CarbonContext.getClass.getName)
-
-  final def updateCarbonPorpertiesPath(hiveContext: HiveContext) {
-    val carbonPropertiesFilePath = hiveContext.getConf("carbon.properties.filepath", null)
-    val systemcarbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
-    if (null != carbonPropertiesFilePath && null == systemcarbonPropertiesFilePath) {
-      System.setProperty("carbon.properties.filepath",
-        carbonPropertiesFilePath + "/" + "carbon.properties")
-    }
-    // configuring the zookeeper URl .
-    val zooKeeperUrl = hiveContext.getConf("spark.deploy.zookeeper.url", "127.0.0.1:2181")
-
-    CarbonProperties.getInstance().addProperty("spark.deploy.zookeeper.url", zooKeeperUrl)
-
-  }
-
-  // this cache is used to avoid creating multiple CarbonContext from same SparkContext,
-  // to avoid the derby problem for metastore
-  private val cache = collection.mutable.Map[SparkContext, CarbonContext]()
-
-  def getInstance(sc: SparkContext): CarbonContext = {
-    cache(sc)
-  }
-
-  def addInstance(sc: SparkContext, cc: CarbonContext): Unit = {
-    if (cache.contains(sc)) {
-      sys.error("creating multiple instances of CarbonContext is not " +
-                "allowed using the same SparkContext instance")
-    }
-    cache(sc) = cc
-  }
-}
-
-object SQLParser {
-  def parse(sql: String, sqlContext: SQLContext): LogicalPlan = sqlContext.parseSql(sql)
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
deleted file mode 100644
index 2c2e954..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import java.text.SimpleDateFormat
-import java.util.Date
-
-import scala.reflect.ClassTag
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.{Job, JobID}
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-import org.apache.spark._
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.sources.{Filter, HadoopFsRelation, OutputWriterFactory}
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.SerializableConfiguration
-
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.scan.expression.logical.AndExpression
-import org.apache.carbondata.core.util.DataTypeUtil
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonProjection}
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
-import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, SchemaReader}
-import org.apache.carbondata.processing.merger.TableMeta
-import org.apache.carbondata.spark.{CarbonFilters, CarbonOption}
-import org.apache.carbondata.spark.rdd.CarbonRDD
-import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
-import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
-
-private[sql] case class CarbonDatasourceHadoopRelation(
-  sqlContext: SQLContext,
-  paths: Array[String],
-  parameters: Map[String, String],
-  tableSchema: Option[StructType])
-  extends HadoopFsRelation {
-
-  lazy val schemaPath = new Path(CarbonTablePath.getSchemaFilePath(paths.head))
-  if (!schemaPath.getFileSystem(new Configuration).exists(schemaPath)) {
-    throw new IllegalArgumentException("invalid CarbonData file path: " + paths.head)
-  }
-
-  lazy val job = new Job(new JobConf())
-  lazy val options = new CarbonOption(parameters)
-  lazy val absIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head)
-  lazy val relationRaw: CarbonRelation = {
-    val carbonTable = SchemaReader.readCarbonTableFromStore(absIdentifier)
-    if (carbonTable == null) {
-      sys.error(s"CarbonData file path ${paths.head} is not valid")
-    }
-    CarbonRelation(
-      carbonTable.getDatabaseName,
-      carbonTable.getFactTableName,
-      CarbonSparkUtil.createSparkMeta(carbonTable),
-      new TableMeta(carbonTable.getCarbonTableIdentifier,
-        paths.head, absIdentifier.getTablePath, carbonTable),
-      None
-    )(sqlContext)
-  }
-
-  override def dataSchema: StructType = tableSchema.getOrElse(relationRaw.schema)
-
-  override def prepareJobForWrite(job: Job): OutputWriterFactory = {
-    // TODO
-    throw new UnsupportedOperationException
-  }
-
-  override def buildScan(
-    requiredColumns: Array[String],
-    filters: Array[Filter],
-    inputFiles: Array[FileStatus]): RDD[Row] = {
-    val conf = new Configuration(job.getConfiguration)
-    filters.flatMap { filter =>
-      CarbonFilters.createCarbonFilter(dataSchema, filter)
-    }.reduceOption(new AndExpression(_, _))
-      .foreach(CarbonTableInputFormat.setFilterPredicates(conf, _))
-
-    val projection = new CarbonProjection
-    requiredColumns.foreach(projection.addColumn)
-    CarbonTableInputFormat.setColumnProjection(conf, projection)
-    CarbonTableInputFormat.setCarbonReadSupport(conf, classOf[SparkRowReadSupportImpl])
-
-    new CarbonHadoopFSRDD[Row](sqlContext.sparkContext,
-      new SerializableConfiguration(conf),
-      absIdentifier,
-      classOf[CarbonTableInputFormat[Row]],
-      classOf[Row]
-    )
-  }
-
-}
-
-class CarbonHadoopFSPartition(rddId: Int, val idx: Int,
-  val carbonSplit: SerializableWritable[CarbonInputSplit])
-  extends Partition {
-
-  override val index: Int = idx
-
-  override def hashCode(): Int = 41 * (41 + rddId) + idx
-}
-
-class CarbonHadoopFSRDD[V: ClassTag](
-  @transient sc: SparkContext,
-  conf: SerializableConfiguration,
-  identifier: AbsoluteTableIdentifier,
-  inputFormatClass: Class[_ <: CarbonTableInputFormat[V]],
-  valueClass: Class[V])
-  extends CarbonRDD[V](sc, Nil) with SparkHadoopMapReduceUtil {
-
-  private val jobTrackerId: String = {
-    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
-    formatter.format(new Date())
-  }
-  @transient protected val jobId = new JobID(jobTrackerId, id)
-
-  override def internalCompute(split: Partition,
-    context: TaskContext): Iterator[V] = {
-    val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
-    val hadoopAttemptContext = newTaskAttemptContext(conf.value, attemptId)
-    val job: Job = new Job(hadoopAttemptContext.getConfiguration)
-    val format = CarbonInputFormatUtil.createCarbonInputFormat(identifier, job)
-    CarbonInputFormat.setDataTypeConverter(hadoopAttemptContext.getConfiguration,
-      new SparkDataTypeConverterImpl)
-    hadoopAttemptContext.getConfiguration.set(FileInputFormat.INPUT_DIR, identifier.getTablePath)
-    val reader =
-      format.createRecordReader(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value,
-        hadoopAttemptContext
-      )
-    reader.initialize(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value,
-      hadoopAttemptContext
-    )
-    new Iterator[V] {
-      private[this] var havePair = false
-      private[this] var finished = false
-
-      override def hasNext: Boolean = {
-        if (context.isInterrupted) {
-          throw new TaskKilledException
-        }
-        if (!finished && !havePair) {
-          finished = !reader.nextKeyValue
-          if (finished) {
-            reader.close()
-          }
-          havePair = !finished
-        }
-        !finished
-      }
-
-      override def next(): V = {
-        if (!hasNext) {
-          throw new java.util.NoSuchElementException("End of stream")
-        }
-        havePair = false
-        reader.getCurrentValue
-      }
-    }
-  }
-
-  override protected def getPartitions: Array[Partition] = {
-    val jobContext = newJobContext(conf.value, jobId)
-    val format = CarbonInputFormatUtil.createCarbonInputFormat(identifier, new Job(conf.value))
-    jobContext.getConfiguration.set(FileInputFormat.INPUT_DIR, identifier.getTablePath)
-    val splits = format.getSplits(jobContext).toArray
-    val carbonInputSplits = splits
-      .map(f => new SerializableWritable(f.asInstanceOf[CarbonInputSplit]))
-    carbonInputSplits.zipWithIndex.map(f => new CarbonHadoopFSPartition(id, f._2, f._1))
-  }
-}


Mime
View raw message