spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject spark git commit: [SPARK-19296][SQL] Deduplicate url and table in JdbcUtils
Date Wed, 01 Feb 2017 17:43:43 GMT
Repository: spark
Updated Branches:
  refs/heads/master 04ee8cf63 -> 5ed397baa


[SPARK-19296][SQL] Deduplicate url and table in JdbcUtils

## What changes were proposed in this pull request?

This PR deduplicates arguments, `url` and `table` in `JdbcUtils` with `JDBCOptions`.

It avoids to use duplicated arguments, for example, as below:

from

```scala
val jdbcOptions = new JDBCOptions(url, table, map)
JdbcUtils.saveTable(ds, url, table, jdbcOptions)
```

to

```scala
val jdbcOptions = new JDBCOptions(url, table, map)
JdbcUtils.saveTable(ds, jdbcOptions)
```

## How was this patch tested?

Running unit test in `JdbcSuite`/`JDBCWriteSuite`

Building with Scala 2.10 as below:

```
./dev/change-scala-version.sh 2.10
./build/mvn -Pyarn -Phadoop-2.4 -Dscala-2.10 -DskipTests clean package
```

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #16753 from HyukjinKwon/SPARK-19296.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5ed397ba
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5ed397ba
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5ed397ba

Branch: refs/heads/master
Commit: 5ed397baa758c29c54a853d3f8fee0ad44e97c14
Parents: 04ee8cf
Author: hyukjinkwon <gurwls223@gmail.com>
Authored: Wed Feb 1 09:43:35 2017 -0800
Committer: gatorsmile <gatorsmile@gmail.com>
Committed: Wed Feb 1 09:43:35 2017 -0800

----------------------------------------------------------------------
 .../datasources/jdbc/JdbcRelationProvider.scala | 34 +++++++++-----------
 .../execution/datasources/jdbc/JdbcUtils.scala  | 26 +++++++--------
 2 files changed, 28 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5ed397ba/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
index e39d936..88f6cb0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
@@ -52,38 +52,34 @@ class JdbcRelationProvider extends CreatableRelationProvider
       mode: SaveMode,
       parameters: Map[String, String],
       df: DataFrame): BaseRelation = {
-    val jdbcOptions = new JDBCOptions(parameters)
-    val url = jdbcOptions.url
-    val table = jdbcOptions.table
-    val createTableOptions = jdbcOptions.createTableOptions
-    val isTruncate = jdbcOptions.isTruncate
+    val options = new JDBCOptions(parameters)
     val isCaseSensitive = sqlContext.conf.caseSensitiveAnalysis
 
-    val conn = JdbcUtils.createConnectionFactory(jdbcOptions)()
+    val conn = JdbcUtils.createConnectionFactory(options)()
     try {
-      val tableExists = JdbcUtils.tableExists(conn, url, table)
+      val tableExists = JdbcUtils.tableExists(conn, options)
       if (tableExists) {
         mode match {
           case SaveMode.Overwrite =>
-            if (isTruncate && isCascadingTruncateTable(url) == Some(false)) {
+            if (options.isTruncate && isCascadingTruncateTable(options.url) == Some(false))
{
               // In this case, we should truncate table and then load.
-              truncateTable(conn, table)
-              val tableSchema = JdbcUtils.getSchemaOption(conn, url, table)
-              saveTable(df, url, table, tableSchema, isCaseSensitive, jdbcOptions)
+              truncateTable(conn, options.table)
+              val tableSchema = JdbcUtils.getSchemaOption(conn, options)
+              saveTable(df, tableSchema, isCaseSensitive, options)
             } else {
               // Otherwise, do not truncate the table, instead drop and recreate it
-              dropTable(conn, table)
-              createTable(df.schema, url, table, createTableOptions, conn)
-              saveTable(df, url, table, Some(df.schema), isCaseSensitive, jdbcOptions)
+              dropTable(conn, options.table)
+              createTable(conn, df.schema, options)
+              saveTable(df, Some(df.schema), isCaseSensitive, options)
             }
 
           case SaveMode.Append =>
-            val tableSchema = JdbcUtils.getSchemaOption(conn, url, table)
-            saveTable(df, url, table, tableSchema, isCaseSensitive, jdbcOptions)
+            val tableSchema = JdbcUtils.getSchemaOption(conn, options)
+            saveTable(df, tableSchema, isCaseSensitive, options)
 
           case SaveMode.ErrorIfExists =>
             throw new AnalysisException(
-              s"Table or view '$table' already exists. SaveMode: ErrorIfExists.")
+              s"Table or view '${options.table}' already exists. SaveMode: ErrorIfExists.")
 
           case SaveMode.Ignore =>
             // With `SaveMode.Ignore` mode, if table already exists, the save operation is
expected
@@ -91,8 +87,8 @@ class JdbcRelationProvider extends CreatableRelationProvider
             // Therefore, it is okay to do nothing here and then just return the relation
below.
         }
       } else {
-        createTable(df.schema, url, table, createTableOptions, conn)
-        saveTable(df, url, table, Some(df.schema), isCaseSensitive, jdbcOptions)
+        createTable(conn, df.schema, options)
+        saveTable(df, Some(df.schema), isCaseSensitive, options)
       }
     } finally {
       conn.close()

http://git-wip-us.apache.org/repos/asf/spark/blob/5ed397ba/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 0590aec..d89f600 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -63,14 +63,14 @@ object JdbcUtils extends Logging {
   /**
    * Returns true if the table already exists in the JDBC database.
    */
-  def tableExists(conn: Connection, url: String, table: String): Boolean = {
-    val dialect = JdbcDialects.get(url)
+  def tableExists(conn: Connection, options: JDBCOptions): Boolean = {
+    val dialect = JdbcDialects.get(options.url)
 
     // Somewhat hacky, but there isn't a good way to identify whether a table exists for
all
     // SQL database systems using JDBC meta data calls, considering "table" could also include
     // the database name. Query used to find table exists can be overridden by the dialects.
     Try {
-      val statement = conn.prepareStatement(dialect.getTableExistsQuery(table))
+      val statement = conn.prepareStatement(dialect.getTableExistsQuery(options.table))
       try {
         statement.executeQuery()
       } finally {
@@ -235,11 +235,11 @@ object JdbcUtils extends Logging {
   /**
    * Returns the schema if the table already exists in the JDBC database.
    */
-  def getSchemaOption(conn: Connection, url: String, table: String): Option[StructType] =
{
-    val dialect = JdbcDialects.get(url)
+  def getSchemaOption(conn: Connection, options: JDBCOptions): Option[StructType] = {
+    val dialect = JdbcDialects.get(options.url)
 
     try {
-      val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
+      val statement = conn.prepareStatement(dialect.getSchemaQuery(options.table))
       try {
         Some(getSchema(statement.executeQuery(), dialect))
       } catch {
@@ -697,11 +697,11 @@ object JdbcUtils extends Logging {
    */
   def saveTable(
       df: DataFrame,
-      url: String,
-      table: String,
       tableSchema: Option[StructType],
       isCaseSensitive: Boolean,
       options: JDBCOptions): Unit = {
+    val url = options.url
+    val table = options.table
     val dialect = JdbcDialects.get(url)
     val rddSchema = df.schema
     val getConnection: () => Connection = createConnectionFactory(options)
@@ -725,12 +725,12 @@ object JdbcUtils extends Logging {
    * Creates a table with a given schema.
    */
   def createTable(
+      conn: Connection,
       schema: StructType,
-      url: String,
-      table: String,
-      createTableOptions: String,
-      conn: Connection): Unit = {
-    val strSchema = schemaString(schema, url)
+      options: JDBCOptions): Unit = {
+    val strSchema = schemaString(schema, options.url)
+    val table = options.table
+    val createTableOptions = options.createTableOptions
     // Create the table if the table does not exist.
     // To allow certain options to append when create a new table, which can be
     // table_options or partition_options.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message