carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From manishgupt...@apache.org
Subject carbondata git commit: [CARBONDATA-2033] Support user specified segments in compaction
Date Mon, 30 Apr 2018 09:54:16 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 26607fb9c -> 5d55b83c1


[CARBONDATA-2033] Support user specified segments in compaction

Support custom comapction

This closes #1812


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5d55b83c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5d55b83c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5d55b83c

Branch: refs/heads/master
Commit: 5d55b83c14e049d91ec7e911000207e5aceb6f18
Parents: 26607fb
Author: Jin Zhou <xaprice@yeah.net>
Authored: Tue Jan 16 17:02:51 2018 +0800
Committer: manishgupta88 <tomanishgupta18@gmail.com>
Committed: Mon Apr 30 15:27:14 2018 +0530

----------------------------------------------------------------------
 docs/data-management-on-carbondata.md           |  15 ++-
 ...CompactionSupportSpecifiedSegmentsTest.scala | 133 +++++++++++++++++++
 .../command/carbonTableSchemaCommon.scala       |   6 +-
 .../spark/rdd/AggregateDataMapCompactor.scala   |   3 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   4 +-
 .../spark/rdd/CarbonTableCompactor.scala        |  22 +--
 .../apache/carbondata/spark/rdd/Compactor.scala |  11 +-
 .../CarbonAlterTableCompactionCommand.scala     |  23 +++-
 .../preaaggregate/PreAggregateListeners.scala   |   4 +
 .../sql/parser/CarbonSpark2SqlParser.scala      |   8 +-
 .../processing/merger/CarbonDataMergerUtil.java |  38 +++++-
 .../processing/merger/CompactionType.java       |   1 +
 12 files changed, 239 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/5d55b83c/docs/data-management-on-carbondata.md
----------------------------------------------------------------------
diff --git a/docs/data-management-on-carbondata.md b/docs/data-management-on-carbondata.md
index a92ec4f..1f12b78 100644
--- a/docs/data-management-on-carbondata.md
+++ b/docs/data-management-on-carbondata.md
@@ -670,10 +670,10 @@ This tutorial is going to introduce all commands and data operations
on CarbonDa
 
   Compaction improves the query performance significantly. 
   
-  There are two types of compaction, Minor and Major compaction.
+  There are several types of compaction.
   
   ```
-  ALTER TABLE [db_name.]table_name COMPACT 'MINOR/MAJOR'
+  ALTER TABLE [db_name.]table_name COMPACT 'MINOR/MAJOR/CUSTOM'
   ```
 
   - **Minor Compaction**
@@ -699,6 +699,17 @@ This tutorial is going to introduce all commands and data operations
on CarbonDa
   ```
   ALTER TABLE table_name COMPACT 'MAJOR'
   ```
+  
+  - **Custom Compaction**
+  
+  In Custom compaction, user can directly specify segment ids to be merged into one large
segment. 
+  All specified segment ids should exist and be valid, otherwise compaction will fail. 
+  Custom compaction is usually done during the off-peak time. 
+  
+  ```
+  ALTER TABLE table_name COMPACT 'CUSTOM' WHERE SEGMENT.ID IN (2,3,4)
+  ```
+  
 
   - **CLEAN SEGMENTS AFTER Compaction**
   

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5d55b83c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportSpecifiedSegmentsTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportSpecifiedSegmentsTest.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportSpecifiedSegmentsTest.scala
new file mode 100644
index 0000000..6f9e49b
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportSpecifiedSegmentsTest.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the"License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an"AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.datacompaction
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+
+class CompactionSupportSpecifiedSegmentsTest
+  extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
+
+  val filePath: String = resourcesPath + "/globalsort/sample1.csv"
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+  }
+
+  override def beforeEach {
+    resetConf()
+    sql("DROP TABLE IF EXISTS seg_compact")
+    sql(
+      """
+        |CREATE TABLE seg_compact
+        |(id INT, name STRING, city STRING, age INT)
+        |STORED BY 'org.apache.carbondata.format'
+        |TBLPROPERTIES('SORT_COLUMNS'='city,name')
+      """.stripMargin)
+  }
+
+  override def afterEach {
+    sql("DROP TABLE IF EXISTS seg_compact")
+  }
+
+  private def resetConf() = {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+        CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
+  }
+
+  test("custom compaction") {
+    for (i <- 0 until 5) {
+      sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE seg_compact")
+    }
+    sql("ALTER TABLE seg_compact COMPACT 'CUSTOM' WHERE SEGMENT.ID IN (1,2,3)")
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE seg_compact")
+    val segInfos = segments.collect().map { each =>
+      ((each.toSeq) (0).toString, (each.toSeq) (1).toString)
+    }
+    assert(segInfos.length == 6)
+    assert(segInfos.contains(("0", "Success")))
+    assert(segInfos.contains(("1", "Compacted")))
+    assert(segInfos.contains(("2", "Compacted")))
+    assert(segInfos.contains(("3", "Compacted")))
+    assert(segInfos.contains(("1.1", "Success")))
+    assert(segInfos.contains(("4", "Success")))
+  }
+
+  test("custom compaction with invalid segment id"){
+    try{
+      for (i <- 0 until 5) {
+        sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE seg_compact")
+      }
+      sql("DELETE FROM TABLE seg_compact WHERE SEGMENT.ID IN (1)")
+      sql("ALTER TABLE seg_compact COMPACT 'CUSTOM' WHERE SEGMENT.ID IN (1,2,3)")
+    }catch {
+      case e: Exception =>
+        assert(e.getMessage.contains("does not exist or is not valid"))
+    }
+  }
+
+  test("custom segment ids must not be empty in custom compaction"){
+    try{
+      for (i <- 0 until 5) {
+        sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE seg_compact")
+      }
+      sql("ALTER TABLE seg_compact COMPACT 'CUSTOM'")
+    }catch {
+      case e: Exception =>
+        assert(e.isInstanceOf[MalformedCarbonCommandException])
+        assert(e.getMessage.startsWith("Segment ids should not be empty"))
+    }
+  }
+
+  test("custom segment ids not supported in major compaction"){
+    try{
+      for (i <- 0 until 5) {
+        sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE seg_compact")
+      }
+      sql("ALTER TABLE seg_compact COMPACT 'MAJOR' WHERE SEGMENT.ID IN (1,2,3)")
+    }catch {
+      case e: Exception =>
+        assert(e.isInstanceOf[MalformedCarbonCommandException])
+        assert(e.getMessage.startsWith("Custom segments not supported"))
+    }
+  }
+
+  test("custom segment ids not supported in minor compaction"){
+    try{
+      for (i <- 0 until 5) {
+        sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE seg_compact")
+      }
+      sql("ALTER TABLE seg_compact COMPACT 'MINOR' WHERE SEGMENT.ID IN (1,2,3)")
+    }catch {
+      case e: Exception =>
+        assert(e.isInstanceOf[MalformedCarbonCommandException])
+        assert(e.getMessage.startsWith("Custom segments not supported"))
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5d55b83c/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 3c21af3..bb3b73a 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -126,7 +126,8 @@ case class AlterTableModel(
     segmentUpdateStatusManager: Option[SegmentUpdateStatusManager],
     compactionType: String,
     factTimeStamp: Option[Long],
-    var alterSql: String)
+    var alterSql: String,
+    customSegmentIds: Option[List[String]] = None)
 
 case class UpdateTableModel(
     isUpdate: Boolean,
@@ -138,7 +139,8 @@ case class CompactionModel(compactionSize: Long,
     compactionType: CompactionType,
     carbonTable: CarbonTable,
     isDDLTrigger: Boolean,
-    currentPartitions: Option[Seq[PartitionSpec]])
+    currentPartitions: Option[Seq[PartitionSpec]],
+    customSegmentIds: Option[List[String]])
 
 case class CompactionCallableModel(carbonLoadModel: CarbonLoadModel,
     carbonTable: CarbonTable,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5d55b83c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
index 1487277..82bae8e 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
@@ -109,7 +109,8 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
         //     because it contains the actual In-Process status for the segments.
         //  3. If we read the tablestatus then 8, 9, 10, 11 will keep getting compacted into
8.1.
         //  4. Therefore tablestatus file will be committed in between multiple commits.
-        if (!compactionModel.compactionType.equals(CompactionType.MAJOR)) {
+        if (!compactionModel.compactionType.equals(CompactionType.MAJOR) &&
+          !compactionModel.compactionType.equals(CompactionType.CUSTOM)) {
           if (!identifySegmentsToBeMerged().isEmpty) {
             val uuidTableStaus = CarbonTablePath.getTableStatusFilePathWithUUID(
               carbonTable.getTablePath, uuid)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5d55b83c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index db22d6b..0b9bd66 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -211,7 +211,7 @@ object CarbonDataRDDFactory {
                 compactionModel.isDDLTrigger,
                 CarbonFilters.getCurrentPartitions(sqlContext.sparkSession,
                   TableIdentifier(table.getTableName,
-                  Some(table.getDatabaseName))))
+                  Some(table.getDatabaseName))), None)
               // proceed for compaction
               try {
                 CompactionFactory.getCompactor(
@@ -734,7 +734,7 @@ object CarbonDataRDDFactory {
         isCompactionTriggerByDDl,
         CarbonFilters.getCurrentPartitions(sqlContext.sparkSession,
           TableIdentifier(carbonTable.getTableName,
-          Some(carbonTable.getDatabaseName))))
+            Some(carbonTable.getDatabaseName))), None)
       var storeLocation = ""
       val configuredStore = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
       if (null != configuredStore && configuredStore.nonEmpty) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5d55b83c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 626a172..26adeee 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -54,13 +54,8 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
     )
     CarbonDataMergerUtil.sortSegments(sortedSegments)
 
-    var segList = carbonLoadModel.getLoadMetadataDetails
-    var loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
-      carbonLoadModel,
-      compactionModel.compactionSize,
-      segList,
-      compactionModel.compactionType
-    )
+    var loadsToMerge = identifySegmentsToBeMerged()
+
     while (loadsToMerge.size() > 1 ||
            (CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType &&
             loadsToMerge.size() > 0)) {
@@ -77,7 +72,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
 
       // scan again and determine if anything is there to merge again.
       carbonLoadModel.readAndSetLoadMetadataDetails()
-      segList = carbonLoadModel.getLoadMetadataDetails
+      var segList = carbonLoadModel.getLoadMetadataDetails
       // in case of major compaction we will scan only once and come out as it will keep
       // on doing major for the new loads also.
       // excluding the newly added segments.
@@ -87,15 +82,12 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
           .filterOutNewlyAddedSegments(carbonLoadModel.getLoadMetadataDetails, lastSegment)
       }
 
-      if (CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType) {
+      if (CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType ||
+        CompactionType.CUSTOM == compactionModel.compactionType) {
         loadsToMerge.clear()
       } else if (segList.size > 0) {
-        loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
-          carbonLoadModel,
-          compactionModel.compactionSize,
-          segList,
-          compactionModel.compactionType
-        )
+        loadsToMerge = identifySegmentsToBeMerged()
+
         if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable)
{
           carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5d55b83c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index 6da8bd6..320cd78 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -17,8 +17,11 @@
 
 package org.apache.carbondata.spark.rdd
 
+import java.util
 import java.util.concurrent.ExecutorService
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.execution.command.CompactionModel
 
@@ -39,11 +42,17 @@ abstract class Compactor(carbonLoadModel: CarbonLoadModel,
   def executeCompaction(): Unit
 
   def identifySegmentsToBeMerged(): java.util.List[LoadMetadataDetails] = {
+    val customSegmentIds: util.List[String] = if (compactionModel.customSegmentIds.isDefined)
{
+      compactionModel.customSegmentIds.get.asJava
+    } else {
+      new util.ArrayList[String]()
+    }
     CarbonDataMergerUtil
       .identifySegmentsToBeMerged(carbonLoadModel,
         compactionModel.compactionSize,
         carbonLoadModel.getLoadMetadataDetails,
-        compactionModel.compactionType)
+        compactionModel.compactionType,
+        customSegmentIds)
   }
 
   def deletePartialLoadsInCompaction(): Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5d55b83c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index e12f052..34ff1f8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -120,6 +120,18 @@ case class CarbonAlterTableCompactionCommand(
     } else if (compactionException.equalsIgnoreCase("false")) {
       Seq.empty
     } else {
+
+      if (compactionType != CompactionType.CUSTOM &&
+        alterTableModel.customSegmentIds.isDefined) {
+        throw new MalformedCarbonCommandException(
+          s"Custom segments not supported when doing ${compactionType.toString} compaction")
+      }
+      if (compactionType == CompactionType.CUSTOM &&
+        alterTableModel.customSegmentIds.isEmpty) {
+        throw new MalformedCarbonCommandException(
+          s"Segment ids should not be empty when doing ${compactionType.toString} compaction")
+      }
+
       val carbonLoadModel = new CarbonLoadModel()
       carbonLoadModel.setTableName(table.getTableName)
       val dataLoadSchema = new CarbonDataLoadSchema(table)
@@ -212,13 +224,20 @@ case class CarbonAlterTableCompactionCommand(
     carbonLoadModel.setFactTimeStamp(loadStartTime)
 
     val isCompactionTriggerByDDl = true
+    val segmentIds: Option[List[String]] = if (compactionType == CompactionType.CUSTOM &&
+      alterTableModel.customSegmentIds.isDefined) {
+      alterTableModel.customSegmentIds
+    } else {
+      None
+    }
     val compactionModel = CompactionModel(compactionSize,
       compactionType,
       carbonTable,
       isCompactionTriggerByDDl,
       CarbonFilters.getCurrentPartitions(sqlContext.sparkSession,
-        TableIdentifier(carbonTable.getTableName,
-        Some(carbonTable.getDatabaseName)))
+      TableIdentifier(carbonTable.getTableName,
+      Some(carbonTable.getDatabaseName))),
+      segmentIds
     )
 
     val isConcurrentCompactionAllowed = CarbonProperties.getInstance()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5d55b83c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 1ce09fb..cb1c11b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -40,6 +40,7 @@ import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events._
 import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent,
LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
+import org.apache.carbondata.processing.merger.CompactionType
 
 object AlterTableDropPartitionPreStatusListener extends OperationEventListener {
   /**
@@ -564,6 +565,9 @@ object AlterPreAggregateTableCompactionPostListener extends OperationEventListen
     val compactionEvent = event.asInstanceOf[AlterTableCompactionPreStatusUpdateEvent]
     val carbonTable = compactionEvent.carbonTable
     val compactionType = compactionEvent.carbonMergerMapping.campactionType
+    if (compactionType == CompactionType.CUSTOM) {
+      return
+    }
     val carbonLoadModel = compactionEvent.carbonLoadModel
     val sparkSession = compactionEvent.sparkSession
     if (CarbonUtil.hasAggregationDataMap(carbonTable)) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5d55b83c/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index dbe39fb..540fdd2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -124,11 +124,13 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
 
 
   protected lazy val alterTable: Parser[LogicalPlan] =
-    ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (COMPACT ~ stringLit) <~ opt(";")
 ^^ {
-      case dbName ~ table ~ (compact ~ compactType) =>
+    ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (COMPACT ~ stringLit) ~
+      (WHERE ~> (SEGMENT ~ "." ~ ID) ~> IN ~> "(" ~> repsep(segmentId, ",") <~
")").? <~
+      opt(";") ^^ {
+      case dbName ~ table ~ (compact ~ compactType) ~ segs =>
         val altertablemodel =
           AlterTableModel(convertDbNameToLowerCase(dbName), table, None, compactType,
-          Some(System.currentTimeMillis()), null)
+          Some(System.currentTimeMillis()), null, segs)
         CarbonAlterTableCompactionCommand(altertablemodel)
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5d55b83c/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 2ac351c..1744675 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -23,6 +23,7 @@ import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.*;
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -388,7 +389,8 @@ public final class CarbonDataMergerUtil {
    */
   public static List<LoadMetadataDetails> identifySegmentsToBeMerged(
           CarbonLoadModel carbonLoadModel, long compactionSize,
-          List<LoadMetadataDetails> segments, CompactionType compactionType) throws
IOException {
+          List<LoadMetadataDetails> segments, CompactionType compactionType,
+          List<String> customSegmentIds) throws IOException, MalformedCarbonCommandException
{
     String tablePath = carbonLoadModel.getTablePath();
     Map<String, String> tableLevelProperties = carbonLoadModel.getCarbonDataLoadSchema()
             .getCarbonTable().getTableInfo().getFactTable().getTableProperties();
@@ -396,6 +398,11 @@ public final class CarbonDataMergerUtil {
 
     sortSegments(sortedSegments);
 
+    if (CompactionType.CUSTOM == compactionType) {
+      return identitySegmentsToBeMergedBasedOnSpecifiedSegments(sortedSegments,
+              new HashSet<>(customSegmentIds));
+    }
+
     // Check for segments which are qualified for IUD compaction.
     if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
 
@@ -445,6 +452,35 @@ public final class CarbonDataMergerUtil {
   }
 
   /**
+   * This method will return the list of loads which are specified by user in SQL.
+   *
+   * @param listOfSegments
+   * @param segmentIds
+   * @return
+   */
+  private static List<LoadMetadataDetails> identitySegmentsToBeMergedBasedOnSpecifiedSegments(
+          List<LoadMetadataDetails> listOfSegments,
+          Set<String> segmentIds) throws MalformedCarbonCommandException {
+    Map<String, LoadMetadataDetails> specifiedSegments =
+            new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    for (LoadMetadataDetails detail : listOfSegments) {
+      if (segmentIds.contains(detail.getLoadName())) {
+        specifiedSegments.put(detail.getLoadName(), detail);
+      }
+    }
+    // all requested segments should exist and be valid
+    for (String segmentId : segmentIds) {
+      if (!specifiedSegments.containsKey(segmentId) ||
+              !isSegmentValid(specifiedSegments.get(segmentId))) {
+        String errMsg = String.format("Segment %s does not exist or is not valid", segmentId);
+        LOGGER.error(errMsg);
+        throw new MalformedCarbonCommandException(errMsg);
+      }
+    }
+    return new ArrayList<>(specifiedSegments.values());
+  }
+
+  /**
    * This method will return the list of loads which are loaded at the same interval.
    * This property is configurable.
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5d55b83c/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
index 9ed87fc..bf8602c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
@@ -29,5 +29,6 @@ public enum CompactionType {
     IUD_DELETE_DELTA,
     STREAMING,
     CLOSE_STREAMING,
+    CUSTOM,
     NONE
 }


Mime
View raw message