carbondata-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jackylk <...@git.apache.org>
Subject [GitHub] carbondata pull request #1755: [CARBONDATA-1976][PARTITION] Support combinat...
Date Thu, 04 Jan 2018 07:29:48 GMT
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1755#discussion_r159592167
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
---
    @@ -702,63 +702,71 @@ case class CarbonLoadDataCommand(
           sparkSession: SparkSession,
           table: CarbonTable,
           logicalPlan: LogicalPlan): Unit = {
    -    sparkSession.sessionState.catalog.listPartitions(
    +    val existingPartitions = sparkSession.sessionState.catalog.listPartitions(
           TableIdentifier(table.getTableName, Some(table.getDatabaseName)),
    -      Some(partition.map(f => (f._1, f._2.get))))
    -    val partitionNames = partition.map(k => k._1 + "=" + k._2.get).toSet
    +      Some(partition.filter(_._2.isDefined).map(f => (f._1, f._2.get))))
    +    val partitionNames = existingPartitions.toList.flatMap { partition =>
    +      partition.spec.seq.map{case (column, value) => column + "=" + value}
    +    }.toSet
         val uniqueId = System.currentTimeMillis().toString
         val segments = new SegmentStatusManager(
           table.getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments
    -    try {
    -      // First drop the partitions from partition mapper files of each segment
    -      new CarbonDropPartitionRDD(
    -        sparkSession.sparkContext,
    -        table.getTablePath,
    -        segments.asScala,
    -        partitionNames.toSeq,
    -        uniqueId).collect()
    -    } catch {
    -      case e: Exception =>
    -        // roll back the drop partitions from carbon store
    -        new CarbonDropPartitionCommitRDD(
    +    // If any existing partitions need to be overwritten then drop from partitionmap
    +    if (partitionNames.nonEmpty) {
    +      try {
    +        // First drop the partitions from partition mapper files of each segment
    +        new CarbonDropPartitionRDD(
               sparkSession.sparkContext,
               table.getTablePath,
               segments.asScala,
    -          false,
    +          partitionNames.toSeq,
               uniqueId).collect()
    -        throw e
    -    }
    +      } catch {
    +        case e: Exception =>
    +          // roll back the drop partitions from carbon store
    +          new CarbonDropPartitionCommitRDD(
    +            sparkSession.sparkContext,
    +            table.getTablePath,
    +            segments.asScala,
    +            false,
    +            uniqueId).collect()
    +          throw e
    +      }
     
    -    try {
    +      try {
    +        Dataset.ofRows(sparkSession, logicalPlan)
    +      } catch {
    +        case e: Exception =>
    +          // roll back the drop partitions from carbon store
    +          new CarbonDropPartitionCommitRDD(
    +            sparkSession.sparkContext,
    +            table.getTablePath,
    +            segments.asScala,
    +            false,
    +            uniqueId).collect()
    +          throw e
    +      }
    +      // Commit the removed partitions in carbon store.
    +      new CarbonDropPartitionCommitRDD(
    +        sparkSession.sparkContext,
    +        table.getTablePath,
    +        segments.asScala,
    +        true,
    +        uniqueId).collect()
    +      // Update the loadstatus with update time to clear cache from driver.
    +      val segmentSet = new util.HashSet[String](new SegmentStatusManager(table
    +        .getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments)
    --- End diff --
    
    make these two lines more readable


---

Mime
View raw message