carbondata-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jackylk <...@git.apache.org>
Subject [GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...
Date Tue, 05 Dec 2017 15:07:53 GMT
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r154972340
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
---
    @@ -212,4 +221,56 @@ case class CarbonAlterTableCompactionCommand(
           }
         }
       }
    +
    +  private def startCompactionForDataMap(carbonLoadModel: CarbonLoadModel,
    +      sparkSession: SparkSession): Unit = {
    +    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +    val loadMetaDataDetails = CarbonDataMergerUtil
    +      .identifySegmentsToBeMerged(carbonLoadModel,
    +        CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR),
    +        carbonLoadModel.getLoadMetadataDetails,
    +        carbonLoadModel.getCompactionType)
    +    val segments = loadMetaDataDetails.asScala.map(_.getLoadName)
    +    if (segments.nonEmpty) {
    +      CarbonSession
    +        .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
    +                   carbonLoadModel.getDatabaseName + "." +
    +                   carbonLoadModel.getTableName,
    +          segments.mkString(","))
    +      CarbonSession.threadSet(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
    +                              carbonLoadModel.getDatabaseName + "." +
    +                              carbonLoadModel.getTableName, "false")
    +      val headers = carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala
    +        .map(_.getColumnName).mkString(",")
    +      val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser()
    +        .addPreAggLoadFunction(PreAggregateUtil
    +          .createChildSelectQuery(carbonTable.getTableInfo.getFactTable))).drop("preAggLoad")
    +      try {
    +        CarbonLoadDataCommand(Some(carbonTable.getDatabaseName),
    +          carbonTable.getTableName,
    +          null,
    +          Nil,
    +          Map("fileheader" -> headers),
    +          isOverwriteTable = false,
    +          dataFrame = Some(childDataFrame),
    +          internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true",
    +            "compactionType" -> carbonLoadModel.getCompactionType.toString)).run(sparkSession)
    +      } finally {
    +        // check if any other segments needs compaction on in case of MINOR_COMPACTION.
    +        // For example: after 8.1 creation 0.1, 4.1, 8.1 have to be merged to 0.2 if
threshhold
    +        // allows it.
    +        if (!carbonLoadModel.getCompactionType.equals(CompactionType.MAJOR)) {
    +          CommonUtil.readLoadMetadataDetails(carbonLoadModel)
    +          startCompactionForDataMap(carbonLoadModel, sparkSession)
    --- End diff --
    
    We should avoid the recursive call, you can invoke it in caller of this function.


---

Mime
View raw message