spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject spark git commit: [SPARK-16222][SQL] JDBC Sources - Handling illegal input values for `fetchsize` and `batchsize`
Date Fri, 01 Jul 2016 08:54:15 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 0b64543c5 -> 3665927c6


[SPARK-16222][SQL] JDBC Sources - Handling illegal input values for `fetchsize` and `batchsize`

#### What changes were proposed in this pull request?
For JDBC data sources, users can specify `batchsize` for multi-row inserts and `fetchsize`
for multi-row fetch. A few issues exist:

- The property keys are case sensitive. Thus, the existing test cases for `fetchsize` use
incorrect names, `fetchSize`. Basically, the test cases are broken.
- No test case exists for `batchsize`.
- We do not detect the illegal input values for `fetchsize` and `batchsize`.

For example, when `batchsize` is zero, we got the following exception:
```
Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure:
Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.ArithmeticException: / by zero
```
when `fetchsize` is less than zero, we got the exception from the underlying JDBC driver:
```
Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure:
Lost task 0.0 in stage 0.0 (TID 0, localhost): org.h2.jdbc.JdbcSQLException: Invalid value
"-1" for parameter "rows" [90008-183]
```

This PR fixes all the above issues, and issue the appropriate exceptions when detecting the
illegal inputs for `fetchsize` and `batchsize`. Also update the function descriptions.

#### How was this patch tested?
Test cases are fixed and added.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #13919 from gatorsmile/jdbcProperties.

(cherry picked from commit 0ad6ce7e54b1d8f5946dde652fa5341d15059158)
Signed-off-by: Sean Owen <sowen@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3665927c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3665927c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3665927c

Branch: refs/heads/branch-2.0
Commit: 3665927c6f5fa4794a59718fd2d339310c70a985
Parents: 0b64543
Author: gatorsmile <gatorsmile@gmail.com>
Authored: Fri Jul 1 09:54:02 2016 +0100
Committer: Sean Owen <sowen@cloudera.com>
Committed: Fri Jul 1 09:54:10 2016 +0100

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameReader.scala  |  6 +-
 .../org/apache/spark/sql/DataFrameWriter.scala  |  3 +-
 .../execution/datasources/jdbc/JDBCRDD.scala    |  6 +-
 .../execution/datasources/jdbc/JdbcUtils.scala  | 10 +++-
 .../apache/spark/sql/jdbc/PostgresDialect.scala |  2 +-
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   | 62 ++++++++++++--------
 .../apache/spark/sql/jdbc/JDBCWriteSuite.scala  | 54 ++++++++++++-----
 7 files changed, 98 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3665927c/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 35ba522..e8c2885 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -177,7 +177,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends
Logging {
    *                      clause expressions used to split the column `columnName` evenly.
    * @param connectionProperties JDBC database connection arguments, a list of arbitrary
string
    *                             tag/value. Normally at least a "user" and "password" property
-   *                             should be included.
+   *                             should be included. "fetchsize" can be used to control the
+   *                             number of rows per fetch.
    * @since 1.4.0
    */
   def jdbc(
@@ -207,7 +208,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends
Logging {
    * @param predicates Condition in the where clause for each partition.
    * @param connectionProperties JDBC database connection arguments, a list of arbitrary
string
    *                             tag/value. Normally at least a "user" and "password" property
-   *                             should be included.
+   *                             should be included. "fetchsize" can be used to control the
+   *                             number of rows per fetch.
    * @since 1.4.0
    */
   def jdbc(

http://git-wip-us.apache.org/repos/asf/spark/blob/3665927c/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index ca3972d..f77af76 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -391,7 +391,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
    * @param table Name of the table in the external database.
    * @param connectionProperties JDBC database connection arguments, a list of arbitrary
string
    *                             tag/value. Normally at least a "user" and "password" property
-   *                             should be included.
+   *                             should be included. "batchsize" can be used to control the
+   *                             number of rows per insert.
    * @since 1.4.0
    */
   def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/3665927c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index 44cfbb9..24e2c1a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -390,7 +390,11 @@ private[sql] class JDBCRDD(
     val sqlText = s"SELECT $columnList FROM $fqTable $myWhereClause"
     val stmt = conn.prepareStatement(sqlText,
         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
-    val fetchSize = properties.getProperty("fetchsize", "0").toInt
+    val fetchSize = properties.getProperty(JdbcUtils.JDBC_BATCH_FETCH_SIZE, "0").toInt
+    require(fetchSize >= 0,
+      s"Invalid value `${fetchSize.toString}` for parameter " +
+      s"`${JdbcUtils.JDBC_BATCH_FETCH_SIZE}`. The minimum value is 0. When the value is 0,
" +
+      "the JDBC driver ignores the value and does the estimates.")
     stmt.setFetchSize(fetchSize)
     val rs = stmt.executeQuery()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3665927c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 065c857..3529ee6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -34,6 +34,10 @@ import org.apache.spark.sql.types._
  */
 object JdbcUtils extends Logging {
 
+  // the property names are case sensitive
+  val JDBC_BATCH_FETCH_SIZE = "fetchsize"
+  val JDBC_BATCH_INSERT_SIZE = "batchsize"
+
   /**
    * Returns a factory for creating connections to the given JDBC URL.
    *
@@ -154,6 +158,10 @@ object JdbcUtils extends Logging {
       nullTypes: Array[Int],
       batchSize: Int,
       dialect: JdbcDialect): Iterator[Byte] = {
+    require(batchSize >= 1,
+      s"Invalid value `${batchSize.toString}` for parameter " +
+      s"`${JdbcUtils.JDBC_BATCH_INSERT_SIZE}`. The minimum value is 1.")
+
     val conn = getConnection()
     var committed = false
     val supportsTransactions = try {
@@ -275,7 +283,7 @@ object JdbcUtils extends Logging {
 
     val rddSchema = df.schema
     val getConnection: () => Connection = createConnectionFactory(url, properties)
-    val batchSize = properties.getProperty("batchsize", "1000").toInt
+    val batchSize = properties.getProperty(JDBC_BATCH_INSERT_SIZE, "1000").toInt
     df.foreachPartition { iterator =>
       savePartition(getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/3665927c/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
index 2d6c397..6baf1b6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
@@ -89,7 +89,7 @@ private object PostgresDialect extends JdbcDialect {
     //
     // See: https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor
     //
-    if (properties.getOrElse("fetchsize", "0").toInt > 0) {
+    if (properties.getOrElse(JdbcUtils.JDBC_BATCH_FETCH_SIZE, "0").toInt > 0) {
       connection.setAutoCommit(false)
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3665927c/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index fd6671a..11e66ad 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -24,12 +24,13 @@ import java.util.{Calendar, GregorianCalendar, Properties}
 import org.h2.jdbc.JdbcSQLException
 import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkException, SparkFunSuite}
 import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.sql.execution.DataSourceScanExec
 import org.apache.spark.sql.execution.command.ExplainCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD
+import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
@@ -83,7 +84,7 @@ class JDBCSuite extends SparkFunSuite
         |CREATE TEMPORARY TABLE fetchtwo
         |USING org.apache.spark.sql.jdbc
         |OPTIONS (url '$url', dbtable 'TEST.PEOPLE', user 'testUser', password 'testPass',
-        |         fetchSize '2')
+        |         ${JdbcUtils.JDBC_BATCH_FETCH_SIZE} '2')
       """.stripMargin.replaceAll("\n", " "))
 
     sql(
@@ -348,38 +349,49 @@ class JDBCSuite extends SparkFunSuite
 
   test("Basic API") {
     assert(spark.read.jdbc(
-      urlWithUserAndPass, "TEST.PEOPLE", new Properties).collect().length === 3)
+      urlWithUserAndPass, "TEST.PEOPLE", new Properties()).collect().length === 3)
+  }
+
+  test("Basic API with illegal FetchSize") {
+    val properties = new Properties()
+    properties.setProperty(JdbcUtils.JDBC_BATCH_FETCH_SIZE, "-1")
+    val e = intercept[SparkException] {
+      spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", properties).collect()
+    }.getMessage
+    assert(e.contains("Invalid value `-1` for parameter `fetchsize`"))
   }
 
   test("Basic API with FetchSize") {
-    val properties = new Properties
-    properties.setProperty("fetchSize", "2")
-    assert(spark.read.jdbc(
-      urlWithUserAndPass, "TEST.PEOPLE", properties).collect().length === 3)
+    (0 to 4).foreach { size =>
+      val properties = new Properties()
+      properties.setProperty(JdbcUtils.JDBC_BATCH_FETCH_SIZE, size.toString)
+      assert(spark.read.jdbc(
+        urlWithUserAndPass, "TEST.PEOPLE", properties).collect().length === 3)
+    }
   }
 
   test("Partitioning via JDBCPartitioningInfo API") {
     assert(
-      spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", "THEID", 0, 4, 3, new Properties)
+      spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", "THEID", 0, 4, 3, new Properties())
       .collect().length === 3)
   }
 
   test("Partitioning via list-of-where-clauses API") {
     val parts = Array[String]("THEID < 2", "THEID >= 2")
-    assert(spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, new Properties)
+    assert(spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, new Properties())
       .collect().length === 3)
   }
 
   test("Partitioning on column that might have null values.") {
     assert(
-      spark.read.jdbc(urlWithUserAndPass, "TEST.EMP", "theid", 0, 4, 3, new Properties)
+      spark.read.jdbc(urlWithUserAndPass, "TEST.EMP", "theid", 0, 4, 3, new Properties())
         .collect().length === 4)
     assert(
-      spark.read.jdbc(urlWithUserAndPass, "TEST.EMP", "THEID", 0, 4, 3, new Properties)
+      spark.read.jdbc(urlWithUserAndPass, "TEST.EMP", "THEID", 0, 4, 3, new Properties())
         .collect().length === 4)
     // partitioning on a nullable quoted column
     assert(
-      spark.read.jdbc(urlWithUserAndPass, "TEST.EMP", """"Dept"""", 0, 4, 3, new Properties)
+      spark.read.jdbc(urlWithUserAndPass, "TEST.EMP", """"Dept"""", 0, 4, 3, new Properties())
         .collect().length === 4)
   }
 
@@ -391,7 +403,7 @@ class JDBCSuite extends SparkFunSuite
       lowerBound = 0,
       upperBound = 4,
       numPartitions = 0,
-      connectionProperties = new Properties
+      connectionProperties = new Properties()
     )
     assert(res.count() === 8)
   }
@@ -404,7 +416,7 @@ class JDBCSuite extends SparkFunSuite
       lowerBound = 1,
       upperBound = 5,
       numPartitions = 10,
-      connectionProperties = new Properties
+      connectionProperties = new Properties()
     )
     assert(res.count() === 8)
   }
@@ -417,7 +429,7 @@ class JDBCSuite extends SparkFunSuite
       lowerBound = 5,
       upperBound = 5,
       numPartitions = 4,
-      connectionProperties = new Properties
+      connectionProperties = new Properties()
     )
     assert(res.count() === 8)
   }
@@ -431,7 +443,7 @@ class JDBCSuite extends SparkFunSuite
         lowerBound = 5,
         upperBound = 1,
         numPartitions = 3,
-        connectionProperties = new Properties
+        connectionProperties = new Properties()
       )
     }.getMessage
     assert(e.contains("Operation not allowed: the lower bound of partitioning column " +
@@ -495,8 +507,8 @@ class JDBCSuite extends SparkFunSuite
 
   test("test DATE types") {
     val rows = spark.read.jdbc(
-      urlWithUserAndPass, "TEST.TIMETYPES", new Properties).collect()
-    val cachedRows = spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties)
+      urlWithUserAndPass, "TEST.TIMETYPES", new Properties()).collect()
+    val cachedRows = spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties())
       .cache().collect()
     assert(rows(0).getAs[java.sql.Date](1) === java.sql.Date.valueOf("1996-01-01"))
     assert(rows(1).getAs[java.sql.Date](1) === null)
@@ -504,8 +516,8 @@ class JDBCSuite extends SparkFunSuite
   }
 
   test("test DATE types in cache") {
-    val rows = spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties).collect()
-    spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties)
+    val rows = spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties()).collect()
+    spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties())
       .cache().createOrReplaceTempView("mycached_date")
     val cachedRows = sql("select * from mycached_date").collect()
     assert(rows(0).getAs[java.sql.Date](1) === java.sql.Date.valueOf("1996-01-01"))
@@ -514,7 +526,7 @@ class JDBCSuite extends SparkFunSuite
 
   test("test types for null value") {
     val rows = spark.read.jdbc(
-      urlWithUserAndPass, "TEST.NULLTYPES", new Properties).collect()
+      urlWithUserAndPass, "TEST.NULLTYPES", new Properties()).collect()
     assert((0 to 14).forall(i => rows(0).isNullAt(i)))
   }
 
@@ -560,7 +572,7 @@ class JDBCSuite extends SparkFunSuite
 
   test("Remap types via JdbcDialects") {
     JdbcDialects.registerDialect(testH2Dialect)
-    val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", new Properties)
+    val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", new Properties())
     assert(df.schema.filter(_.dataType != org.apache.spark.sql.types.StringType).isEmpty)
     val rows = df.collect()
     assert(rows(0).get(0).isInstanceOf[String])
@@ -694,7 +706,7 @@ class JDBCSuite extends SparkFunSuite
     // Regression test for bug SPARK-11788
     val timestamp = java.sql.Timestamp.valueOf("2001-02-20 11:22:33.543543");
     val date = java.sql.Date.valueOf("1995-01-01")
-    val jdbcDf = spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties)
+    val jdbcDf = spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties())
     val rows = jdbcDf.where($"B" > date && $"C" > timestamp).collect()
     assert(rows(0).getAs[java.sql.Date](1) === java.sql.Date.valueOf("1996-01-01"))
     assert(rows(0).getAs[java.sql.Timestamp](2)
@@ -714,7 +726,7 @@ class JDBCSuite extends SparkFunSuite
   }
 
   test("test credentials in the connection url are not in the plan output") {
-    val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", new Properties)
+    val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", new Properties())
     val explain = ExplainCommand(df.queryExecution.logical, extended = true)
     spark.sessionState.executePlan(explain).executedPlan.executeCollect().foreach {
       r => assert(!List("testPass", "testUser").exists(r.toString.contains))
@@ -746,7 +758,7 @@ class JDBCSuite extends SparkFunSuite
         urlWithUserAndPass,
         "TEST.PEOPLE",
         predicates = Array[String](jdbcPartitionWhereClause),
-        new Properties)
+        new Properties())
 
       df.createOrReplaceTempView("tempFrame")
       assertEmptyQuery(s"SELECT * FROM tempFrame where $FALSE2")

http://git-wip-us.apache.org/repos/asf/spark/blob/3665927c/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index ff66f53..2c6449f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -22,7 +22,9 @@ import java.util.Properties
 
 import org.scalatest.BeforeAndAfter
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.{Row, SaveMode}
+import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -90,10 +92,34 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
   test("Basic CREATE") {
     val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
 
-    df.write.jdbc(url, "TEST.BASICCREATETEST", new Properties)
-    assert(2 === spark.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).count)
+    df.write.jdbc(url, "TEST.BASICCREATETEST", new Properties())
+    assert(2 === spark.read.jdbc(url, "TEST.BASICCREATETEST", new Properties()).count())
     assert(
-      2 === spark.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).collect()(0).length)
+      2 === spark.read.jdbc(url, "TEST.BASICCREATETEST", new Properties()).collect()(0).length)
+  }
+
+  test("Basic CREATE with illegal batchsize") {
+    val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
+
+    (-1 to 0).foreach { size =>
+      val properties = new Properties()
+      properties.setProperty(JdbcUtils.JDBC_BATCH_INSERT_SIZE, size.toString)
+      val e = intercept[SparkException] {
+        df.write.mode(SaveMode.Overwrite).jdbc(url, "TEST.BASICCREATETEST", properties)
+      }.getMessage
+      assert(e.contains(s"Invalid value `$size` for parameter `batchsize`"))
+    }
+  }
+
+  test("Basic CREATE with batchsize") {
+    val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
+
+    (1 to 3).foreach { size =>
+      val properties = new Properties()
+      properties.setProperty(JdbcUtils.JDBC_BATCH_INSERT_SIZE, size.toString)
+      df.write.mode(SaveMode.Overwrite).jdbc(url, "TEST.BASICCREATETEST", properties)
+      assert(2 === spark.read.jdbc(url, "TEST.BASICCREATETEST", new Properties()).count())
+    }
   }
 
   test("CREATE with overwrite") {
@@ -101,11 +127,11 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
     val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2)
 
     df.write.jdbc(url1, "TEST.DROPTEST", properties)
-    assert(2 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).count)
+    assert(2 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).count())
     assert(3 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length)
 
     df2.write.mode(SaveMode.Overwrite).jdbc(url1, "TEST.DROPTEST", properties)
-    assert(1 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).count)
+    assert(1 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).count())
     assert(2 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length)
   }
 
@@ -113,10 +139,10 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
     val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
     val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2)
 
-    df.write.jdbc(url, "TEST.APPENDTEST", new Properties)
-    df2.write.mode(SaveMode.Append).jdbc(url, "TEST.APPENDTEST", new Properties)
-    assert(3 === spark.read.jdbc(url, "TEST.APPENDTEST", new Properties).count)
-    assert(2 === spark.read.jdbc(url, "TEST.APPENDTEST", new Properties).collect()(0).length)
+    df.write.jdbc(url, "TEST.APPENDTEST", new Properties())
+    df2.write.mode(SaveMode.Append).jdbc(url, "TEST.APPENDTEST", new Properties())
+    assert(3 === spark.read.jdbc(url, "TEST.APPENDTEST", new Properties()).count())
+    assert(2 === spark.read.jdbc(url, "TEST.APPENDTEST", new Properties()).collect()(0).length)
   }
 
   test("CREATE then INSERT to truncate") {
@@ -125,7 +151,7 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
 
     df.write.jdbc(url1, "TEST.TRUNCATETEST", properties)
     df2.write.mode(SaveMode.Overwrite).jdbc(url1, "TEST.TRUNCATETEST", properties)
-    assert(1 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count)
+    assert(1 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count())
     assert(2 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).collect()(0).length)
   }
 
@@ -133,22 +159,22 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
     val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
     val df2 = spark.createDataFrame(sparkContext.parallelize(arr2x3), schema3)
 
-    df.write.jdbc(url, "TEST.INCOMPATIBLETEST", new Properties)
+    df.write.jdbc(url, "TEST.INCOMPATIBLETEST", new Properties())
     intercept[org.apache.spark.SparkException] {
-      df2.write.mode(SaveMode.Append).jdbc(url, "TEST.INCOMPATIBLETEST", new Properties)
+      df2.write.mode(SaveMode.Append).jdbc(url, "TEST.INCOMPATIBLETEST", new Properties())
     }
   }
 
   test("INSERT to JDBC Datasource") {
     sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE")
-    assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count)
+    assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count())
     assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length)
   }
 
   test("INSERT to JDBC Datasource with overwrite") {
     sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE")
     sql("INSERT OVERWRITE TABLE PEOPLE1 SELECT * FROM PEOPLE")
-    assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count)
+    assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count())
     assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message