spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gurwls...@apache.org
Subject [spark] branch branch-3.1 updated: [SPARK-34143][SQL][TESTS][3.1] Fix adding partitions to fully partitioned v2 tables
Date Wed, 20 Jan 2021 01:10:36 GMT
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 7014696  [SPARK-34143][SQL][TESTS][3.1] Fix adding partitions to fully partitioned
v2 tables
7014696 is described below

commit 70146962301e8a99df696cce50dc171fb982d425
Author: Max Gekk <max.gekk@gmail.com>
AuthorDate: Wed Jan 20 10:10:12 2021 +0900

    [SPARK-34143][SQL][TESTS][3.1] Fix adding partitions to fully partitioned v2 tables
    
    ### What changes were proposed in this pull request?
    While adding new partition to v2 `InMemoryAtomicPartitionTable`/`InMemoryPartitionTable`,
add single row to the table content when the table is fully partitioned.
    
    ### Why are the changes needed?
    The `ALTER TABLE .. ADD PARTITION` command does not change content of fully partitioned
v2 table. For instance, `INSERT INTO` changes table content:
    ```scala
          sql(s"CREATE TABLE t (p0 INT, p1 STRING) USING _ PARTITIONED BY (p0, p1)")
          sql(s"INSERT INTO t SELECT 1, 'def'")
          sql(s"SELECT * FROM t").show(false)
    
    +---+---+
    |p0 |p1 |
    +---+---+
    |1  |def|
    +---+---+
    ```
    but `ALTER TABLE .. ADD PARTITION` doesn't change v2 table content:
    ```scala
          sql(s"ALTER TABLE t ADD PARTITION (p0 = 0, p1 = 'abc')")
          sql(s"SELECT * FROM t").show(false)
    
    +---+---+
    |p0 |p1 |
    +---+---+
    +---+---+
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No, the changes impact only on tests but for the example above in tests:
    ```scala
          sql(s"ALTER TABLE t ADD PARTITION (p0 = 0, p1 = 'abc')")
          sql(s"SELECT * FROM t").show(false)
    
    +---+---+
    |p0 |p1 |
    +---+---+
    |0  |abc|
    +---+---+
    ```
    
    ### How was this patch tested?
    By running new test:
    ```
    $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *AlterTablePartitionV2SQLSuite"
    ```
    
    Authored-by: Max Gekk <max.gekkgmail.com>
    Signed-off-by: Wenchen Fan <wenchendatabricks.com>
    (cherry picked from commit a98e77c1138c0f232ac99f151ef2ecbf332b6fbc)
    Signed-off-by: Max Gekk <max.gekkgmail.com>
    
    Closes #31247 from MaxGekk/add-partition-by-all-columns-3.1.
    
    Authored-by: Max Gekk <max.gekk@gmail.com>
    Signed-off-by: HyukjinKwon <gurwls223@apache.org>
---
 .../sql/connector/InMemoryAtomicPartitionTable.scala      |  1 +
 .../spark/sql/connector/InMemoryPartitionTable.scala      |  1 +
 .../org/apache/spark/sql/connector/InMemoryTable.scala    | 10 ++++++++++
 .../sql/connector/AlterTablePartitionV2SQLSuite.scala     | 15 ++++++++++++++-
 4 files changed, 26 insertions(+), 1 deletion(-)

diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala
index c2a95cc..b0501b1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala
@@ -42,6 +42,7 @@ class InMemoryAtomicPartitionTable (
     if (memoryTablePartitions.containsKey(ident)) {
       throw new PartitionAlreadyExistsException(name, ident, partitionSchema)
     } else {
+      createPartitionKey(ident.toSeq(schema))
       memoryTablePartitions.put(ident, properties)
     }
   }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala
index e29c78c..377842a 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala
@@ -54,6 +54,7 @@ class InMemoryPartitionTable(
     if (memoryTablePartitions.containsKey(ident)) {
       throw new PartitionAlreadyExistsException(name, ident, partitionSchema)
     } else {
+      createPartitionKey(ident.toSeq(schema))
       memoryTablePartitions.put(ident, properties)
     }
   }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
index c4c5835..aac3077 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
@@ -165,6 +165,16 @@ class InMemoryTable(
 
   protected def addPartitionKey(key: Seq[Any]): Unit = {}
 
+  protected def createPartitionKey(key: Seq[Any]): Unit = dataMap.synchronized {
+    if (!dataMap.contains(key)) {
+      val emptyRows = new BufferedRows(key.toArray.mkString("/"))
+      val rows = if (key.length == schema.length) {
+        emptyRows.withRow(InternalRow.fromSeq(key))
+      } else emptyRows
+      dataMap.put(key, rows)
+    }
+  }
+
   def withData(data: Array[BufferedRows]): InMemoryTable = dataMap.synchronized {
     data.foreach(_.rows.foreach { row =>
       val key = getKey(row)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala
index 9987043..fcc6a67 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.connector
 
 import java.time.{LocalDate, LocalDateTime}
 
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionsException, PartitionsAlreadyExistException}
 import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
@@ -296,4 +296,17 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {
       assert(!partTable.partitionExists(InternalRow(null)))
     }
   }
+
+  test("SPARK-34143: add a partition to fully partitioned table") {
+    val t = "testpart.ns1.ns2.tbl"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (p0 INT, p1 STRING) USING foo PARTITIONED BY (p0, p1)")
+      sql(s"ALTER TABLE $t ADD PARTITION (p0 = 0, p1 = 'abc')")
+      val partTable = catalog("testpart").asTableCatalog
+        .loadTable(Identifier.of(Array("ns1", "ns2"), "tbl"))
+        .asPartitionable
+      assert(partTable.partitionExists(InternalRow(0, UTF8String.fromString("abc"))))
+      checkAnswer(sql(s"SELECT * FROM $t"), Row(0, "abc"))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message