carbondata-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhatchayani <...@git.apache.org>
Subject [GitHub] carbondata pull request #1702: [CARBONDATA-1896] Clean files operation impro...
Date Fri, 22 Dec 2017 15:24:30 GMT
Github user dhatchayani commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1702#discussion_r158510211
  
    --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
---
    @@ -315,88 +314,100 @@ object CarbonDataRDDFactory {
         val isSortTable = carbonTable.getNumberOfSortColumns > 0
         val sortScope = CarbonDataProcessorUtil.getSortScope(carbonLoadModel.getSortScope)
     
    +    val segmentLock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
    +      CarbonTablePath.addSegmentPrefix(carbonLoadModel.getSegmentId) + LockUsage.LOCK)
    +
         try {
    -      if (updateModel.isDefined) {
    -        res = loadDataFrameForUpdate(
    -          sqlContext,
    -          dataFrame,
    -          carbonLoadModel,
    -          updateModel,
    -          carbonTable)
    -        res.foreach { resultOfSeg =>
    -          resultOfSeg.foreach { resultOfBlock =>
    -            if (resultOfBlock._2._1.getSegmentStatus == SegmentStatus.LOAD_FAILURE) {
    -              loadStatus = SegmentStatus.LOAD_FAILURE
    -              if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE) {
    -                updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
    -                updateModel.get.executorErrors.errorMsg = "Failure in the Executor."
    -              } else {
    -                updateModel.get.executorErrors = resultOfBlock._2._2
    +      if (segmentLock.lockWithRetries()) {
    +        if (updateModel.isDefined) {
    +          res = loadDataFrameForUpdate(
    +            sqlContext,
    +            dataFrame,
    +            carbonLoadModel,
    +            updateModel,
    +            carbonTable)
    +          res.foreach { resultOfSeg =>
    +            resultOfSeg.foreach { resultOfBlock =>
    +              if (resultOfBlock._2._1.getSegmentStatus == SegmentStatus.LOAD_FAILURE)
{
    +                loadStatus = SegmentStatus.LOAD_FAILURE
    +                if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE) {
    +                  updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
    +                  updateModel.get.executorErrors.errorMsg = "Failure in the Executor."
    +                } else {
    +                  updateModel.get.executorErrors = resultOfBlock._2._2
    +                }
    +              } else if (resultOfBlock._2._1.getSegmentStatus ==
    +                         SegmentStatus.LOAD_PARTIAL_SUCCESS) {
    +                loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS
    +                updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses
    +                updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg
                   }
    -            } else if (resultOfBlock._2._1.getSegmentStatus ==
    -                       SegmentStatus.LOAD_PARTIAL_SUCCESS) {
    -              loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS
    -              updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses
    -              updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg
                 }
               }
    -        }
    -      } else {
    -        status = if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null)
{
    -          loadDataForPartitionTable(sqlContext, dataFrame, carbonLoadModel, hadoopConf)
    -        } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT))
{
    -          DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession,
    -            dataFrame, carbonLoadModel, hadoopConf)
    -        } else if (dataFrame.isDefined) {
    -          loadDataFrame(sqlContext, dataFrame, carbonLoadModel)
             } else {
    -          loadDataFile(sqlContext, carbonLoadModel, hadoopConf)
    -        }
    -        CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
    -          Seq(carbonLoadModel.getSegmentId), storePath, carbonTable, false)
    -        val newStatusMap = scala.collection.mutable.Map.empty[String, SegmentStatus]
    -        if (status.nonEmpty) {
    -          status.foreach { eachLoadStatus =>
    -            val state = newStatusMap.get(eachLoadStatus._1)
    -            state match {
    -              case Some(SegmentStatus.LOAD_FAILURE) =>
    -                newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
    -              case Some(SegmentStatus.LOAD_PARTIAL_SUCCESS)
    -                if eachLoadStatus._2._1.getSegmentStatus ==
    -                   SegmentStatus.SUCCESS =>
    -                newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
    -              case _ =>
    -                newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
    -            }
    +          status = if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null)
{
    +            loadDataForPartitionTable(sqlContext, dataFrame, carbonLoadModel, hadoopConf)
    +          } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT))
{
    +            DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession,
    +              dataFrame, carbonLoadModel, hadoopConf)
    +          } else if (dataFrame.isDefined) {
    +            loadDataFrame(sqlContext, dataFrame, carbonLoadModel)
    +          } else {
    +            loadDataFile(sqlContext, carbonLoadModel, hadoopConf)
               }
    -
    -          newStatusMap.foreach {
    -            case (key, value) =>
    -              if (value == SegmentStatus.LOAD_FAILURE) {
    -                loadStatus = SegmentStatus.LOAD_FAILURE
    -              } else if (value == SegmentStatus.LOAD_PARTIAL_SUCCESS &&
    -                         loadStatus!= SegmentStatus.LOAD_FAILURE) {
    -                loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS
    +          CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
    +            Seq(carbonLoadModel.getSegmentId), storePath, carbonTable, false)
    +          val newStatusMap = scala.collection.mutable.Map.empty[String, SegmentStatus]
    +          if (status.nonEmpty) {
    +            status.foreach { eachLoadStatus =>
    +              val state = newStatusMap.get(eachLoadStatus._1)
    +              state match {
    +                case Some(SegmentStatus.LOAD_FAILURE) =>
    +                  newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
    +                case Some(SegmentStatus.LOAD_PARTIAL_SUCCESS)
    +                  if eachLoadStatus._2._1.getSegmentStatus ==
    +                     SegmentStatus.SUCCESS =>
    +                  newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
    +                case _ =>
    +                  newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
                   }
    -          }
    -        } else {
    -          // if no value is there in data load, make load status Success
    -          // and data load flow executes
    -          if (dataFrame.isDefined && updateModel.isEmpty) {
    -            val rdd = dataFrame.get.rdd
    -            if (rdd.partitions == null || rdd.partitions.length == 0) {
    -              LOGGER.warn("DataLoading finished. No data was loaded.")
    -              loadStatus = SegmentStatus.SUCCESS
    +            }
    +
    +            newStatusMap.foreach {
    +              case (key, value) =>
    +                if (value == SegmentStatus.LOAD_FAILURE) {
    +                  loadStatus = SegmentStatus.LOAD_FAILURE
    +                } else if (value == SegmentStatus.LOAD_PARTIAL_SUCCESS &&
    +                           loadStatus != SegmentStatus.LOAD_FAILURE) {
    +                  loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS
    +                }
                 }
               } else {
    -            loadStatus = SegmentStatus.LOAD_FAILURE
    +            // if no value is there in data load, make load status Success
    +            // and data load flow executes
    +            if (dataFrame.isDefined && updateModel.isEmpty) {
    +              val rdd = dataFrame.get.rdd
    +              if (rdd.partitions == null || rdd.partitions.length == 0) {
    +                LOGGER.warn("DataLoading finished. No data was loaded.")
    +                loadStatus = SegmentStatus.SUCCESS
    +              }
    +            } else {
    +              loadStatus = SegmentStatus.LOAD_FAILURE
    +            }
               }
    -        }
     
    -        if (loadStatus != SegmentStatus.LOAD_FAILURE &&
    -            partitionStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
    -          loadStatus = partitionStatus
    +          if (loadStatus != SegmentStatus.LOAD_FAILURE &&
    +              partitionStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
    +            loadStatus = partitionStatus
    +          }
             }
    +      } else {
    +        LOGGER.audit("Not able to acquire the segment lock for table " +
    --- End diff --
    
    no


---

Mime
View raw message