spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-16401][SQL] Data Source API: Enable Extending RelationProvider and CreatableRelationProvider without Extending SchemaRelationProvider
Date Sat, 09 Jul 2016 12:37:48 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 50d7002b6 -> a33643cbf


[SPARK-16401][SQL] Data Source API: Enable Extending RelationProvider and CreatableRelationProvider
without Extending SchemaRelationProvider

#### What changes were proposed in this pull request?
When users try to implement a data source API with extending only `RelationProvider` and `CreatableRelationProvider`,
they will hit an error when resolving the relation.
```Scala
spark.read
.format("org.apache.spark.sql.test.DefaultSourceWithoutUserSpecifiedSchema")
  .load()
  .write.
format("org.apache.spark.sql.test.DefaultSourceWithoutUserSpecifiedSchema")
  .save()
```

The error they hit is like
```
org.apache.spark.sql.test.DefaultSourceWithoutUserSpecifiedSchema does not allow user-specified
schemas.;
org.apache.spark.sql.AnalysisException: org.apache.spark.sql.test.DefaultSourceWithoutUserSpecifiedSchema
does not allow user-specified schemas.;
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:319)
	at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:494)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
```

Actually, the bug fix is simple. [`DataSource.createRelation(sparkSession.sqlContext, mode,
options, data)`](https://github.com/gatorsmile/spark/blob/dd644f8117e889cebd6caca58702a7c7e3d88bef/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L429)
already returns a BaseRelation. We should not assign schema to `userSpecifiedSchema`. That
schema assignment only makes sense for the data sources that extend `FileFormat`.

#### How was this patch tested?
Added a test case.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #14075 from gatorsmile/dataSource.

(cherry picked from commit 7374e518e2641fddfe57003340db410224b37581)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a33643cb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a33643cb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a33643cb

Branch: refs/heads/branch-2.0
Commit: a33643cbf0f8b68bde5bd6f9a706ee0f5be377f9
Parents: 50d7002
Author: gatorsmile <gatorsmile@gmail.com>
Authored: Sat Jul 9 20:35:45 2016 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Sat Jul 9 20:37:06 2016 +0800

----------------------------------------------------------------------
 .../sql/execution/datasources/DataSource.scala  |  5 ++-
 .../sql/test/DataFrameReaderWriterSuite.scala   | 32 ++++++++++++++++++++
 2 files changed, 34 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a33643cb/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 6dc27c1..f572b93 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -485,12 +485,11 @@ case class DataSource(
             data.logicalPlan,
             mode)
         sparkSession.sessionState.executePlan(plan).toRdd
+        // Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring
it.
+        copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
 
       case _ =>
         sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
     }
-
-    // We replace the schema with that of the DataFrame we just wrote out to avoid re-inferring
it.
-    copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a33643cb/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 7308f85..27a0a2a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -82,6 +82,29 @@ class DefaultSource
   }
 }
 
+/** Dummy provider with only RelationProvider and CreatableRelationProvider. */
+class DefaultSourceWithoutUserSpecifiedSchema
+  extends RelationProvider
+  with CreatableRelationProvider {
+
+  case class FakeRelation(sqlContext: SQLContext) extends BaseRelation {
+    override def schema: StructType = StructType(Seq(StructField("a", StringType)))
+  }
+
+  override def createRelation(
+      sqlContext: SQLContext,
+      parameters: Map[String, String]): BaseRelation = {
+    FakeRelation(sqlContext)
+  }
+
+  override def createRelation(
+      sqlContext: SQLContext,
+      mode: SaveMode,
+      parameters: Map[String, String],
+      data: DataFrame): BaseRelation = {
+    FakeRelation(sqlContext)
+  }
+}
 
 class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with BeforeAndAfter
{
 
@@ -120,6 +143,15 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext
with Be
       .save()
   }
 
+  test("resolve default source without extending SchemaRelationProvider") {
+    spark.read
+      .format("org.apache.spark.sql.test.DefaultSourceWithoutUserSpecifiedSchema")
+      .load()
+      .write
+      .format("org.apache.spark.sql.test.DefaultSourceWithoutUserSpecifiedSchema")
+      .save()
+  }
+
   test("resolve full class") {
     spark.read
       .format("org.apache.spark.sql.test.DefaultSource")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message