spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marmb...@apache.org
Subject spark git commit: [SPARK-7601] [SQL] Support Insert into JDBC Datasource
Date Thu, 14 May 2015 00:24:39 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 c53ebea9d -> 820aaa6b9


[SPARK-7601] [SQL] Support Insert into JDBC Datasource

Supported InsertableRelation for JDBC Datasource JDBCRelation.
Example usage:
sqlContext.sql(
      s"""
        |CREATE TEMPORARY TABLE testram1
        |USING org.apache.spark.sql.jdbc
        |OPTIONS (url '$url', dbtable 'testram1', user 'xx', password 'xx', driver 'com.h2.Driver')
      """.stripMargin.replaceAll("\n", " "))

sqlContext.sql("insert into table testram1 select * from testsrc")
sqlContext.sql("insert overwrite table testram1 select * from testsrc")

Author: Venkata Ramana Gollamudi <ramana.gollamudi@huawei.com>

Closes #6121 from gvramana/JDBCDatasource_insert and squashes the following commits:

f3fb5f1 [Venkata Ramana Gollamudi] Support for JDBC Datasource InsertableRelation

(cherry picked from commit 59aaa1dad6bee06e38ee5c03bdf82354242286ee)
Signed-off-by: Michael Armbrust <michael@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/820aaa6b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/820aaa6b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/820aaa6b

Branch: refs/heads/branch-1.4
Commit: 820aaa6b9abe2045f6d392c430082601770e91c9
Parents: c53ebea
Author: Venkata Ramana Gollamudi <ramana.gollamudi@huawei.com>
Authored: Wed May 13 17:24:04 2015 -0700
Committer: Michael Armbrust <michael@databricks.com>
Committed: Wed May 13 17:24:31 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/jdbc/JDBCRelation.scala    |  8 ++++-
 .../apache/spark/sql/jdbc/JDBCWriteSuite.scala  | 37 +++++++++++++++++++-
 2 files changed, 43 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/820aaa6b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
index d6b3fb3..93e8254 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.Partition
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.expressions.Row
 import org.apache.spark.sql.sources._
@@ -129,7 +130,8 @@ private[sql] case class JDBCRelation(
     parts: Array[Partition],
     properties: Properties = new Properties())(@transient val sqlContext: SQLContext)
   extends BaseRelation
-  with PrunedFilteredScan {
+  with PrunedFilteredScan
+  with InsertableRelation {
 
   override val needConversion: Boolean = false
 
@@ -148,4 +150,8 @@ private[sql] case class JDBCRelation(
       filters,
       parts)
   }
+  
+  override def insert(data: DataFrame, overwrite: Boolean): Unit = {
+    data.insertIntoJDBC(url, table, overwrite, properties)
+  }  
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/820aaa6b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index f3ce8e6..0800ede 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -43,6 +43,29 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter {
    
     conn1 = DriverManager.getConnection(url1, properties)
     conn1.prepareStatement("create schema test").executeUpdate()
+    conn1.prepareStatement("drop table if exists test.people").executeUpdate()
+    conn1.prepareStatement(
+      "create table test.people (name TEXT(32) NOT NULL, theid INTEGER NOT NULL)").executeUpdate()
+    conn1.prepareStatement("insert into test.people values ('fred', 1)").executeUpdate()
+    conn1.prepareStatement("insert into test.people values ('mary', 2)").executeUpdate()
+    conn1.prepareStatement("drop table if exists test.people1").executeUpdate()
+    conn1.prepareStatement(
+      "create table test.people1 (name TEXT(32) NOT NULL, theid INTEGER NOT NULL)").executeUpdate()
+    conn1.commit()
+     
+    TestSQLContext.sql(
+      s"""
+        |CREATE TEMPORARY TABLE PEOPLE
+        |USING org.apache.spark.sql.jdbc
+        |OPTIONS (url '$url1', dbtable 'TEST.PEOPLE', user 'testUser', password 'testPass')
+      """.stripMargin.replaceAll("\n", " "))
+    
+    TestSQLContext.sql(
+      s"""
+        |CREATE TEMPORARY TABLE PEOPLE1
+        |USING org.apache.spark.sql.jdbc
+        |OPTIONS (url '$url1', dbtable 'TEST.PEOPLE1', user 'testUser', password 'testPass')
+      """.stripMargin.replaceAll("\n", " "))  
   }
 
   after {
@@ -114,5 +137,17 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter {
       df2.insertIntoJDBC(url, "TEST.INCOMPATIBLETEST", true)
     }
   }
-
+  
+  test("INSERT to JDBC Datasource") {
+    TestSQLContext.sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE")
+    assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).count)
+    assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length)
+  }
+  
+  test("INSERT to JDBC Datasource with overwrite") {
+    TestSQLContext.sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE")
+    TestSQLContext.sql("INSERT OVERWRITE TABLE PEOPLE1 SELECT * FROM PEOPLE")
+    assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).count)
+    assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length)
+  } 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message