spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject spark git commit: [SPARK-21648][SQL] Fix confusing assert failure in JDBC source when parallel fetching parameters are not properly provided.
Date Mon, 07 Aug 2017 20:04:26 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 fa92a7be7 -> a1c1199e1


[SPARK-21648][SQL] Fix confusing assert failure in JDBC source when parallel fetching parameters
are not properly provided.

### What changes were proposed in this pull request?
```SQL
CREATE TABLE mytesttable1
USING org.apache.spark.sql.jdbc
  OPTIONS (
  url 'jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}?user=${jdbcUsername}&password=${jdbcPassword}',
  dbtable 'mytesttable1',
  paritionColumn 'state_id',
  lowerBound '0',
  upperBound '52',
  numPartitions '53',
  fetchSize '10000'
)
```

The above option name `paritionColumn` is wrong. That mean, users did not provide the value
for `partitionColumn`. In such case, users hit a confusing error.

```
AssertionError: assertion failed
java.lang.AssertionError: assertion failed
	at scala.Predef$.assert(Predef.scala:156)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:39)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:312)
```

### How was this patch tested?
Added a test case

Author: gatorsmile <gatorsmile@gmail.com>

Closes #18864 from gatorsmile/jdbcPartCol.

(cherry picked from commit baf5cac0f8c35925c366464d7e0eb5f6023fce57)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a1c1199e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a1c1199e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a1c1199e

Branch: refs/heads/branch-2.2
Commit: a1c1199e122889ed34415be5e4da67168107a595
Parents: fa92a7b
Author: gatorsmile <gatorsmile@gmail.com>
Authored: Mon Aug 7 13:04:04 2017 -0700
Committer: gatorsmile <gatorsmile@gmail.com>
Committed: Mon Aug 7 13:04:22 2017 -0700

----------------------------------------------------------------------
 .../datasources/jdbc/JDBCOptions.scala          | 11 ++++++----
 .../datasources/jdbc/JdbcRelationProvider.scala |  9 ++++++--
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   | 22 ++++++++++++++++++++
 .../apache/spark/sql/jdbc/JDBCWriteSuite.scala  |  5 +++--
 4 files changed, 39 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a1c1199e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index 591096d..96a8a51 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -97,10 +97,13 @@ class JDBCOptions(
   val lowerBound = parameters.get(JDBC_LOWER_BOUND).map(_.toLong)
   // the upper bound of the partition column
   val upperBound = parameters.get(JDBC_UPPER_BOUND).map(_.toLong)
-  require(partitionColumn.isEmpty ||
-    (lowerBound.isDefined && upperBound.isDefined && numPartitions.isDefined),
-    s"If '$JDBC_PARTITION_COLUMN' is specified then '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND',"
+
-      s" and '$JDBC_NUM_PARTITIONS' are required.")
+  // numPartitions is also used for data source writing
+  require((partitionColumn.isEmpty && lowerBound.isEmpty && upperBound.isEmpty)
||
+    (partitionColumn.isDefined && lowerBound.isDefined && upperBound.isDefined
&&
+      numPartitions.isDefined),
+    s"When reading JDBC data sources, users need to specify all or none for the following
" +
+      s"options: '$JDBC_PARTITION_COLUMN', '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND', " +
+      s"and '$JDBC_NUM_PARTITIONS'")
   val fetchSize = {
     val size = parameters.getOrElse(JDBC_BATCH_FETCH_SIZE, "0").toInt
     require(size >= 0,

http://git-wip-us.apache.org/repos/asf/spark/blob/a1c1199e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
index 74dcfb0..37e7bb0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
@@ -29,6 +29,8 @@ class JdbcRelationProvider extends CreatableRelationProvider
   override def createRelation(
       sqlContext: SQLContext,
       parameters: Map[String, String]): BaseRelation = {
+    import JDBCOptions._
+
     val jdbcOptions = new JDBCOptions(parameters)
     val partitionColumn = jdbcOptions.partitionColumn
     val lowerBound = jdbcOptions.lowerBound
@@ -36,10 +38,13 @@ class JdbcRelationProvider extends CreatableRelationProvider
     val numPartitions = jdbcOptions.numPartitions
 
     val partitionInfo = if (partitionColumn.isEmpty) {
-      assert(lowerBound.isEmpty && upperBound.isEmpty)
+      assert(lowerBound.isEmpty && upperBound.isEmpty, "When 'partitionColumn' is
not specified, " +
+        s"'$JDBC_LOWER_BOUND' and '$JDBC_UPPER_BOUND' are expected to be empty")
       null
     } else {
-      assert(lowerBound.nonEmpty && upperBound.nonEmpty && numPartitions.nonEmpty)
+      assert(lowerBound.nonEmpty && upperBound.nonEmpty && numPartitions.nonEmpty,
+        s"When 'partitionColumn' is specified, '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND',
and " +
+          s"'$JDBC_NUM_PARTITIONS' are also required")
       JDBCPartitioningInfo(
         partitionColumn.get, lowerBound.get, upperBound.get, numPartitions.get)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/a1c1199e/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index daec409..ae4b1bf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -412,6 +412,28 @@ class JDBCSuite extends SparkFunSuite
     assert(e.contains("Invalid value `-1` for parameter `fetchsize`"))
   }
 
+  test("Missing partition columns") {
+    withView("tempPeople") {
+      val e = intercept[IllegalArgumentException] {
+        sql(
+          s"""
+             |CREATE OR REPLACE TEMPORARY VIEW tempPeople
+             |USING org.apache.spark.sql.jdbc
+             |OPTIONS (
+             |  url 'jdbc:h2:mem:testdb0;user=testUser;password=testPass',
+             |  dbtable 'TEST.PEOPLE',
+             |  lowerBound '0',
+             |  upperBound '52',
+             |  numPartitions '53',
+             |  fetchSize '10000' )
+           """.stripMargin.replaceAll("\n", " "))
+      }.getMessage
+      assert(e.contains("When reading JDBC data sources, users need to specify all or none
" +
+        "for the following options: 'partitionColumn', 'lowerBound', 'upperBound', and "
+
+        "'numPartitions'"))
+    }
+  }
+
   test("Basic API with FetchSize") {
     (0 to 4).foreach { size =>
       val properties = new Properties()

http://git-wip-us.apache.org/repos/asf/spark/blob/a1c1199e/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index bf1fd16..d7ae456 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -323,8 +323,9 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
         .option("partitionColumn", "foo")
         .save()
     }.getMessage
-    assert(e.contains("If 'partitionColumn' is specified then 'lowerBound', 'upperBound',"
+
-      " and 'numPartitions' are required."))
+    assert(e.contains("When reading JDBC data sources, users need to specify all or none
" +
+      "for the following options: 'partitionColumn', 'lowerBound', 'upperBound', and " +
+      "'numPartitions'"))
   }
 
   test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message