carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject carbondata git commit: [CARBONDATA-1976][PARTITION] Support combination of dynamic and static partitions. And fix concurrent partition load issue.
Date Sat, 06 Jan 2018 16:01:38 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 829e7aa4e -> af4277e94


[CARBONDATA-1976][PARTITION] Support combination of dynamic and static partitions. And fix
concurrent partition load issue.

Support combination of dynamic and static partitions.

This closes #1755


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/af4277e9
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/af4277e9
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/af4277e9

Branch: refs/heads/master
Commit: af4277e9495faac3082e8c179cc049bcbeb699e2
Parents: 829e7aa
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Wed Jan 3 10:48:09 2018 +0530
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Sun Jan 7 00:01:24 2018 +0800

----------------------------------------------------------------------
 .../core/metadata/PartitionMapFileStore.java    |  40 +++++-
 .../hadoop/api/CarbonOutputCommitter.java       |   8 +-
 .../StandardPartitionTableDropTestCase.scala    |  28 +++++
 .../StandardPartitionTableLoadingTestCase.scala |  49 +++++++-
 ...tandardPartitionTableOverwriteTestCase.scala |  74 +++++++++++
 .../spark/rdd/CarbonDropPartitionRDD.scala      |   8 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   6 +-
 .../management/CarbonLoadDataCommand.scala      | 122 +++++++++++--------
 ...rbonAlterTableDropHivePartitionCommand.scala |  19 ++-
 .../datasources/CarbonFileFormat.scala          |  48 ++++++--
 .../sql/parser/CarbonSpark2SqlParser.scala      |   4 +-
 11 files changed, 327 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/af4277e9/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
index d29dfbb..f7074c4 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
@@ -237,15 +237,21 @@ public class PartitionMapFileStore {
    * @param segmentPath
    * @param partitionsToDrop
    * @param uniqueId
+   * @param partialMatch  If it is true then even the partial partition spec matches also
can be
+   *                      dropped
    * @throws IOException
    */
-  public void dropPartitions(String segmentPath, List<String> partitionsToDrop, String
uniqueId)
-      throws IOException {
+  public void dropPartitions(String segmentPath, List<String> partitionsToDrop, String
uniqueId,
+      boolean partialMatch) throws IOException {
     readAllPartitionsOfSegment(segmentPath);
     List<String> indexesToDrop = new ArrayList<>();
     for (Map.Entry<String, List<String>> entry: partitionMap.entrySet()) {
-      for (String partition: partitionsToDrop) {
-        if (entry.getValue().contains(partition)) {
+      if (partialMatch) {
+        if (entry.getValue().containsAll(partitionsToDrop)) {
+          indexesToDrop.add(entry.getKey());
+        }
+      } else {
+        if (partitionsToDrop.containsAll(entry.getValue())) {
           indexesToDrop.add(entry.getKey());
         }
       }
@@ -302,7 +308,7 @@ public class PartitionMapFileStore {
 
     LoadMetadataDetails[] details = ssm.readLoadMetadata(table.getMetaDataFilepath());
     // scan through each segment.
-
+    List<String> segmentsNeedToBeDeleted = new ArrayList<>();
     for (LoadMetadataDetails segment : details) {
 
       // if this segment is valid then only we will go for deletion of related
@@ -318,6 +324,12 @@ public class PartitionMapFileStore {
         String partitionFilePath = getPartitionFilePath(segmentPath);
         if (partitionFilePath != null) {
           PartitionMapper partitionMapper = readPartitionMap(partitionFilePath);
+          if (partitionMapper.partitionMap.size() == 0) {
+            // There is no partition information, it means all partitions are dropped.
+            // So segment need to be marked as delete.
+            segmentsNeedToBeDeleted.add(segment.getLoadName());
+            continue;
+          }
           DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
           SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
           indexFileStore.readAllIIndexOfSegment(segmentPath);
@@ -356,7 +368,11 @@ public class PartitionMapFileStore {
             // Delete all old partition files
             for (CarbonFile partitionFile : partitionFiles) {
               if (!currentPartitionFile.getName().equalsIgnoreCase(partitionFile.getName()))
{
-                partitionFile.delete();
+                long fileTimeStamp = Long.parseLong(partitionFile.getName().substring(0,
+                    partitionFile.getName().length() - CarbonTablePath.PARTITION_MAP_EXT.length()));
+                if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(fileTimeStamp) || forceDelete)
{
+                  partitionFile.delete();
+                }
               }
             }
           }
@@ -370,6 +386,18 @@ public class PartitionMapFileStore {
         }
       }
     }
+    // If any segments that are required to delete
+    if (segmentsNeedToBeDeleted.size() > 0) {
+      try {
+        // Mark the segments as delete.
+        SegmentStatusManager.updateDeletionStatus(
+            table.getAbsoluteTableIdentifier(),
+            segmentsNeedToBeDeleted,
+            table.getMetaDataFilepath());
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
   }
 
   public List<String> getPartitions(String indexFileName) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/af4277e9/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 525249a..bc7c56f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -83,7 +83,13 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
    * @throws IOException
    */
   @Override public void commitJob(JobContext context) throws IOException {
-    super.commitJob(context);
+    try {
+      super.commitJob(context);
+    } catch (IOException e) {
+      // ignore, in case of concurrent load it try to remove temporary folders by other load
may
+      // cause file not found exception. This will not impact carbon load,
+      LOGGER.warn(e.getMessage());
+    }
     boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration());
     CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration());
     LoadMetadataDetails newMetaEntry = loadModel.getCurrentLoadMetadataDetail();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/af4277e9/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
index 9a9940b..2aa9145 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
@@ -155,6 +155,33 @@ class StandardPartitionTableDropTestCase extends QueryTest with BeforeAndAfterAl
 
   }
 
+
+  test("dropping all partition on table and do compaction") {
+    sql(
+      """
+        | CREATE TABLE partitionallcompaction (empno int, empname String, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectjoindate Timestamp, projectenddate Date,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (deptname String,doj Timestamp,projectcode int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionallcompaction
OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionallcompaction
OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionallcompaction
OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionallcompaction
OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""ALTER TABLE partitionallcompaction DROP PARTITION(deptname='Learning')""")
+    sql(s"""ALTER TABLE partitionallcompaction DROP PARTITION(deptname='configManagement')""")
+    sql(s"""ALTER TABLE partitionallcompaction DROP PARTITION(deptname='network')""")
+    sql(s"""ALTER TABLE partitionallcompaction DROP PARTITION(deptname='protocol')""")
+    sql(s"""ALTER TABLE partitionallcompaction DROP PARTITION(deptname='security')""")
+    assert(sql(s"""SHOW PARTITIONS partitionallcompaction""").collect().length == 0)
+    sql("ALTER TABLE partitionallcompaction COMPACT 'MAJOR'").collect()
+    checkAnswer(
+      sql(s"""select count (*) from partitionallcompaction"""),
+      Seq(Row(0)))
+  }
+
   override def afterAll = {
     dropTable
   }
@@ -167,6 +194,7 @@ class StandardPartitionTableDropTestCase extends QueryTest with BeforeAndAfterAl
     sql("drop table if exists partitionmany")
     sql("drop table if exists partitionshow")
     sql("drop table if exists staticpartition")
+    sql("drop table if exists partitionallcompaction")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/af4277e9/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index b399138..25e73c4 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -17,6 +17,8 @@
 package org.apache.carbondata.spark.testsuite.standardpartition
 
 import java.io.{File, IOException}
+import java.util
+import java.util.concurrent.{Callable, ExecutorService, Executors}
 
 import org.apache.commons.io.FileUtils
 import org.apache.spark.sql.test.util.QueryTest
@@ -31,7 +33,7 @@ import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 
 class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfterAll {
-
+  var executorService: ExecutorService = _
   override def beforeAll {
     dropTable
 
@@ -275,6 +277,47 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
     }
   }
 
+  test("concurrent partition table load test") {
+    executorService = Executors.newCachedThreadPool()
+    sql(
+      """
+        | CREATE TABLE partitionmultiplethreeconcurrent (empno int, doj Timestamp,
+        |  workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance
int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (workgroupcategory int, empname String, designation String)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('DICTIONARY_INCLUDE'='empname,designation,deptname')
+      """.stripMargin)
+
+    val tasks = new util.ArrayList[Callable[String]]()
+    tasks.add(new QueryTask(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
partitionmultiplethreeconcurrent OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')"""))
+    tasks.add(new QueryTask(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
partitionmultiplethreeconcurrent OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')"""))
+    tasks.add(new QueryTask(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
partitionmultiplethreeconcurrent OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')"""))
+    val results = executorService.invokeAll(tasks)
+    for (i <- 0 until tasks.size()) {
+      val res = results.get(i).get
+      assert("PASS".equals(res))
+    }
+    executorService.shutdown()
+    checkAnswer(sql("select count(*) from partitionmultiplethreeconcurrent"), Seq(Row(30)))
+  }
+
+  class QueryTask(query: String) extends Callable[String] {
+    override def call(): String = {
+      var result = "PASS"
+      try {
+        LOGGER.info("Executing :" + Thread.currentThread().getName)
+        sql(query)
+      } catch {
+        case ex: Exception =>
+          ex.printStackTrace()
+          result = "FAIL"
+      }
+      result
+    }
+  }
+
   test("merge carbon index disable data loading for partition table for three partition column")
{
     CarbonProperties.getInstance.addProperty(
       CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
@@ -396,6 +439,9 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
 
   override def afterAll = {
     dropTable
+    if (executorService != null && !executorService.isShutdown) {
+      executorService.shutdownNow()
+    }
   }
 
   def dropTable = {
@@ -413,6 +459,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
     sql("drop table if exists streamingpartitionedtable")
     sql("drop table if exists mergeindexpartitionthree")
     sql("drop table if exists loadstaticpartitiononeissue")
+    sql("drop table if exists partitionmultiplethreeconcurrent")
     sql("drop table if exists loadpartitionwithspecialchar")
     sql("drop table if exists emp1")
     sql("drop table if exists restorepartition")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/af4277e9/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala
index 945542a..15126b6 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.carbondata.spark.testsuite.standardpartition
 
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
@@ -85,6 +86,75 @@ class StandardPartitionTableOverwriteTestCase extends QueryTest with BeforeAndAf
       sql("select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
from originTable where projectenddate=cast('2016-06-29' as Date)"))
   }
 
+  test("dynamic and static partition table with load syntax") {
+    sql(
+      """
+        | CREATE TABLE loadstaticpartitiondynamic (designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance
int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (empno int, empname String)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE loadstaticpartitiondynamic
PARTITION(empno='1', empname) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    checkAnswer(sql(s"select count(*) from loadstaticpartitiondynamic where empno=1"), sql(s"select
count(*) from loadstaticpartitiondynamic"))
+  }
+
+  test("dynamic and static partition table with overwrite ") {
+    sql(
+      """
+        | CREATE TABLE insertstaticpartitiondynamic (designation String, doj Timestamp,salary
int)
+        | PARTITIONED BY (empno int, empname String)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE insertstaticpartitiondynamic
PARTITION(empno, empname) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    val rows = sql(s"select count(*) from insertstaticpartitiondynamic").collect()
+    sql("""insert overwrite table insertstaticpartitiondynamic PARTITION(empno='1', empname)
select designation, doj, salary, empname from insertstaticpartitiondynamic""")
+
+    checkAnswer(sql(s"select count(*) from insertstaticpartitiondynamic where empno=1"),
rows)
+
+    intercept[Exception] {
+      sql("""insert overwrite table insertstaticpartitiondynamic PARTITION(empno, empname='ravi')
select designation, doj, salary, empname from insertstaticpartitiondynamic""")
+    }
+
+  }
+
+  test("overwriting all partition on table and do compaction") {
+    sql(
+      """
+        | CREATE TABLE partitionallcompaction (empno int, empname String, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectjoindate Timestamp, projectenddate Date,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (deptname String,doj Timestamp,projectcode int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE partitionallcompaction
OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE partitionallcompaction
PARTITION(deptname='Learning', doj, projectcode) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')
""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE partitionallcompaction
PARTITION(deptname='configManagement', doj, projectcode) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'=
'"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE partitionallcompaction
PARTITION(deptname='network', doj, projectcode) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE partitionallcompaction
PARTITION(deptname='protocol', doj, projectcode) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE partitionallcompaction
PARTITION(deptname='security', doj, projectcode) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql("ALTER TABLE partitionallcompaction COMPACT 'MAJOR'").collect()
+    checkExistence(sql(s"""SHOW segments for table partitionallcompaction"""), true, "Marked
for Delete")
+  }
+
+  test("Test overwrite static partition ") {
+    sql(
+      """
+        | CREATE TABLE weather6 (type String)
+        | PARTITIONED BY (year int, month int, day int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql("insert into weather6 partition(year=2014, month=5, day=25) select 'rainy'")
+    sql("insert into weather6 partition(year=2014, month=4, day=23) select 'cloudy'")
+    sql("insert overwrite table weather6 partition(year=2014, month=5, day=25) select 'sunny'")
+    checkExistence(sql("select * from weather6"), true, "sunny")
+    checkAnswer(sql("select count(*) from weather6"), Seq(Row(2)))
+  }
+
 
   override def afterAll = {
     dropTable
@@ -94,6 +164,10 @@ class StandardPartitionTableOverwriteTestCase extends QueryTest with BeforeAndAf
     sql("drop table if exists originTable")
     sql("drop table if exists partitiondateinsert")
     sql("drop table if exists staticpartitiondateinsert")
+    sql("drop table if exists loadstaticpartitiondynamic")
+    sql("drop table if exists insertstaticpartitiondynamic")
+    sql("drop table if exists partitionallcompaction")
+    sql("drop table if exists weather6")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/af4277e9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
index d377c4d..0a79295 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
@@ -37,13 +37,16 @@ case class CarbonDropPartition(rddId: Int, val idx: Int, segmentPath:
String)
  * @param sc
  * @param tablePath
  * @param segments segments to be merged
+ * @param partialMatch If it is true then even the partial partition spec matches also can
be
+ *                      dropped
  */
 class CarbonDropPartitionRDD(
     sc: SparkContext,
     tablePath: String,
     segments: Seq[String],
     partitions: Seq[String],
-    uniqueId: String)
+    uniqueId: String,
+    partialMatch: Boolean)
   extends CarbonRDD[String](sc, Nil) {
 
   override def getPartitions: Array[Partition] = {
@@ -60,7 +63,8 @@ class CarbonDropPartitionRDD(
       new PartitionMapFileStore().dropPartitions(
         split.segmentPath,
         partitions.toList.asJava,
-        uniqueId)
+        uniqueId,
+        partialMatch)
 
       var havePair = false
       var finished = false

http://git-wip-us.apache.org/repos/asf/carbondata/blob/af4277e9/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 66351e3..3da603b 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -998,10 +998,10 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser
{
       case _ => ("", "")
     }
 
-  protected lazy val partitions: Parser[(String, String)] =
-    (ident <~ "=") ~ stringLit ^^ {
+  protected lazy val partitions: Parser[(String, Option[String])] =
+    (ident <~ "=".?) ~ stringLit.? ^^ {
       case opt ~ optvalue => (opt.trim, optvalue)
-      case _ => ("", "")
+      case _ => ("", None)
     }
 
   protected lazy val valueOptions: Parser[(Int, Int)] =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/af4277e9/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 0c6879c..60adcd7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -602,7 +602,7 @@ case class CarbonLoadDataCommand(
           val lowerCasePartition = partition.map{case(key, value) => (key.toLowerCase,
value)}
           catalogTable.schema.map { attr =>
             attributes.find(_.name.equalsIgnoreCase(attr.name)).get
-          }.filter(attr => lowerCasePartition.get(attr.name.toLowerCase).isEmpty)
+          }.filter(attr => lowerCasePartition.getOrElse(attr.name.toLowerCase, None).isEmpty)
         } else {
           catalogTable.schema.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get)
         }
@@ -713,7 +713,12 @@ case class CarbonLoadDataCommand(
     options += (("onepass", loadModel.getUseOnePass.toString))
     options += (("dicthost", loadModel.getDictionaryServerHost))
     options += (("dictport", loadModel.getDictionaryServerPort.toString))
-    options += (("staticpartition", partition.nonEmpty.toString))
+    if (partition.nonEmpty) {
+      val staticPartitionStr = ObjectSerializationUtil.convertObjectToString(
+        new util.HashMap[String, Boolean](
+          partition.map{case (col, value) => (col.toLowerCase, value.isDefined)}.asJava))
+      options += (("staticpartition", staticPartitionStr))
+    }
     options += (("operationcontext", operationContextStr))
     options ++= this.options
     if (updateModel.isDefined) {
@@ -745,63 +750,80 @@ case class CarbonLoadDataCommand(
       sparkSession: SparkSession,
       table: CarbonTable,
       logicalPlan: LogicalPlan): Unit = {
-    sparkSession.sessionState.catalog.listPartitions(
-      TableIdentifier(table.getTableName, Some(table.getDatabaseName)),
-      Some(partition.map(f => (f._1, f._2.get))))
-    val partitionNames = partition.map(k => k._1 + "=" + k._2.get).toSet
+    val identifier = TableIdentifier(table.getTableName, Some(table.getDatabaseName))
+    val existingPartitions = sparkSession.sessionState.catalog.listPartitions(
+      identifier,
+      Some(partition.filter(_._2.isDefined).map(f => (f._1, f._2.get))))
+    val partitionNames = existingPartitions.toList.flatMap { partition =>
+      partition.spec.seq.map{case (column, value) => column + "=" + value}
+    }.toSet
     val uniqueId = System.currentTimeMillis().toString
     val segments = new SegmentStatusManager(
       table.getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments
-    try {
-      // First drop the partitions from partition mapper files of each segment
-      new CarbonDropPartitionRDD(
-        sparkSession.sparkContext,
-        table.getTablePath,
-        segments.asScala,
-        partitionNames.toSeq,
-        uniqueId).collect()
-    } catch {
-      case e: Exception =>
-        // roll back the drop partitions from carbon store
-        new CarbonDropPartitionCommitRDD(
+    // If any existing partitions need to be overwritten then drop from partitionmap
+    if (partitionNames.nonEmpty) {
+      try {
+        // First drop the partitions from partition mapper files of each segment
+        new CarbonDropPartitionRDD(
           sparkSession.sparkContext,
           table.getTablePath,
           segments.asScala,
-          false,
-          uniqueId).collect()
-        throw e
-    }
+          partitionNames.toSeq,
+          uniqueId,
+          partialMatch = false).collect()
+      } catch {
+        case e: Exception =>
+          // roll back the drop partitions from carbon store
+          new CarbonDropPartitionCommitRDD(
+            sparkSession.sparkContext,
+            table.getTablePath,
+            segments.asScala,
+            success = false,
+            uniqueId).collect()
+          throw e
+      }
 
-    try {
+      try {
+        Dataset.ofRows(sparkSession, logicalPlan)
+      } catch {
+        case e: Exception =>
+          // roll back the drop partitions from carbon store
+          new CarbonDropPartitionCommitRDD(
+            sparkSession.sparkContext,
+            table.getTablePath,
+            segments.asScala,
+            success = false,
+            uniqueId).collect()
+          throw e
+      }
+      // Commit the removed partitions in carbon store.
+      new CarbonDropPartitionCommitRDD(
+        sparkSession.sparkContext,
+        table.getTablePath,
+        segments.asScala,
+        success = true,
+        uniqueId).collect()
+      // get valid segments
+      val validsegments =
+        new SegmentStatusManager(
+          table.getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments
+      // Update the loadstatus with update time to clear cache from driver.
+      CarbonUpdateUtil.updateTableMetadataStatus(
+        new util.HashSet[String](validsegments),
+        table,
+        uniqueId,
+        true,
+        new util.ArrayList[String])
+      DataMapStoreManager.getInstance().clearDataMaps(table.getAbsoluteTableIdentifier)
+      // Clean the overwriting segments if any.
+      new PartitionMapFileStore().cleanSegments(
+        table,
+        CarbonFilters.getPartitions(Seq.empty, sparkSession, identifier).asJava,
+        false)
+    } else {
+      // Otherwise its a normal load
       Dataset.ofRows(sparkSession, logicalPlan)
-    } catch {
-      case e: Exception =>
-        // roll back the drop partitions from carbon store
-        new CarbonDropPartitionCommitRDD(
-          sparkSession.sparkContext,
-          table.getTablePath,
-          segments.asScala,
-          false,
-          uniqueId).collect()
-        throw e
     }
-    // Commit the removed partitions in carbon store.
-    new CarbonDropPartitionCommitRDD(
-      sparkSession.sparkContext,
-      table.getTablePath,
-      segments.asScala,
-      true,
-      uniqueId).collect()
-    // Update the loadstatus with update time to clear cache from driver.
-    val segmentSet = new util.HashSet[String](new SegmentStatusManager(table
-      .getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments)
-    CarbonUpdateUtil.updateTableMetadataStatus(
-      segmentSet,
-      table,
-      uniqueId,
-      true,
-      new util.ArrayList[String])
-    DataMapStoreManager.getInstance().clearDataMaps(table.getAbsoluteTableIdentifier)
   }
 
   def getDataFrameWithTupleID(): DataFrame = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/af4277e9/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
index c68e43c..dbd686b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
@@ -25,11 +25,13 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand,
AtomicRunnableCommand}
+import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.util.AlterTableUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.PartitionMapFileStore
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.spark.rdd.{CarbonDropPartitionCommitRDD, CarbonDropPartitionRDD}
@@ -73,15 +75,19 @@ case class CarbonAlterTableDropHivePartitionCommand(
       }
 
       // Drop the partitions from hive.
-      AlterTableDropPartitionCommand(tableName, specs, ifExists, purge, retainData)
-        .run(sparkSession)
+      AlterTableDropPartitionCommand(
+        tableName,
+        specs,
+        ifExists,
+        purge,
+        retainData).run(sparkSession)
     }
     Seq.empty[Row]
   }
 
 
   override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] =
{
-    AlterTableAddPartitionCommand(tableName, specs.map((_, None)), ifExists)
+    AlterTableAddPartitionCommand(tableName, specs.map((_, None)), true)
     val msg = s"Got exception $exception when processing data of drop partition." +
               "Adding back partitions to the metadata"
     LogServiceFactory.getLogService(this.getClass.getCanonicalName).error(msg)
@@ -114,7 +120,8 @@ case class CarbonAlterTableDropHivePartitionCommand(
           table.getTablePath,
           segments.asScala,
           partitionNames.toSeq,
-          uniqueId).collect()
+          uniqueId,
+          partialMatch = true).collect()
       } catch {
         case e: Exception =>
           // roll back the drop partitions from carbon store
@@ -143,6 +150,10 @@ case class CarbonAlterTableDropHivePartitionCommand(
       DataMapStoreManager.getInstance().clearDataMaps(table.getAbsoluteTableIdentifier)
     } finally {
       AlterTableUtil.releaseLocks(locks)
+      new PartitionMapFileStore().cleanSegments(
+        table,
+        new util.ArrayList(CarbonFilters.getPartitions(Seq.empty, sparkSession, tableName).asJava),
+        false)
     }
     Seq.empty[Row]
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/af4277e9/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index 4b368de..36df787 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -42,6 +42,7 @@ import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat}
 import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
+import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
 import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, Util}
@@ -204,9 +205,14 @@ private class CarbonOutputWriter(path: String,
     fieldTypes: Seq[DataType])
   extends OutputWriter with AbstractCarbonOutputWriter {
   val partitions = getPartitionsFromPath(path, context).map(ExternalCatalogUtils.unescapePathName)
-  val staticPartition = {
+  val staticPartition: util.HashMap[String, Boolean] = {
     val staticPart = context.getConfiguration.get("carbon.staticpartition")
-    staticPart != null && staticPart.toBoolean
+    if (staticPart != null) {
+      ObjectSerializationUtil.convertStringToObject(
+        staticPart).asInstanceOf[util.HashMap[String, Boolean]]
+    } else {
+      null
+    }
   }
   lazy val partitionData = if (partitions.nonEmpty) {
     val updatedPartitions = partitions.map{ p =>
@@ -223,7 +229,7 @@ private class CarbonOutputWriter(path: String,
       }
     }
 
-    if (staticPartition) {
+    if (staticPartition != null) {
       val loadModel = recordWriter.getLoadModel
       val table = loadModel.getCarbonDataLoadSchema.getCarbonTable
       var timeStampformatString = loadModel.getTimestampformat
@@ -237,11 +243,17 @@ private class CarbonOutputWriter(path: String,
       }
       val dateFormat = new SimpleDateFormat(dateFormatString)
       updatedPartitions.map {case (col, value) =>
-        CarbonScalaUtil.convertToCarbonFormat(value,
-          CarbonScalaUtil.convertCarbonToSparkDataType(
-            table.getColumnByName(table.getTableName, col).getDataType),
-          timeFormat,
-          dateFormat)
+        // Only convert the static partitions to the carbon format and use it while loading
data
+        // to carbon.
+        if (staticPartition.getOrDefault(col, false)) {
+          CarbonScalaUtil.convertToCarbonFormat(value,
+            CarbonScalaUtil.convertCarbonToSparkDataType(
+              table.getColumnByName(table.getTableName, col).getDataType),
+            timeFormat,
+            dateFormat)
+        } else {
+          value
+        }
       }
     } else {
       updatedPartitions.map(_._2)
@@ -309,9 +321,25 @@ private class CarbonOutputWriter(path: String,
       (col, value)
     }.toMap
     val updatedPartitions =
-      if (staticPartition) {
-        splitPartitions
+      if (staticPartition != null) {
+        // There can be scnerio like dynamic and static combination, in that case we should
convert
+        // only the dyanamic partition values to the proper format and store to carbon parttion
map
+        splitPartitions.map { case (col, value) =>
+          if (!staticPartition.getOrDefault(col, false)) {
+            CarbonScalaUtil.updatePartitions(
+              Seq((col, value)).toMap,
+              table,
+              timeFormat,
+              dateFormat,
+              serializeFormat,
+              badRecordAction,
+              isEmptyBadRecord).toSeq.head
+          } else {
+            (col, value)
+          }
+        }
       } else {
+        // All dynamic partitions need to be converted to proper format
         CarbonScalaUtil.updatePartitions(
           splitPartitions,
           table,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/af4277e9/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index a25be06..4045478 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -361,7 +361,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
           validateOptions(optionsList)
         }
         val optionsMap = optionsList.getOrElse(List.empty[(String, String)]).toMap
-        val partitionSpec = partitions.getOrElse(List.empty[(String, String)]).toMap
+        val partitionSpec = partitions.getOrElse(List.empty[(String, Option[String])]).toMap
         CarbonLoadDataCommand(
           databaseNameOp = convertDbNameToLowerCase(databaseNameOp),
           tableName = tableName,
@@ -374,7 +374,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
           updateModel = None,
           tableInfoOp = None,
           internalOptions = Map.empty,
-          partition = partitionSpec.map { case (key, value) => (key, Some(value))})
+          partition = partitionSpec)
     }
 
   protected lazy val deleteLoadsByID: Parser[LogicalPlan] =


Mime
View raw message