spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-9486][SQL] Add data source aliasing for external packages
Date Sat, 08 Aug 2015 18:03:04 GMT
Repository: spark
Updated Branches:
  refs/heads/master 23695f1d2 -> a3aec918b


[SPARK-9486][SQL] Add data source aliasing for external packages

Users currently have to provide the full class name for external data sources, like:

`sqlContext.read.format("com.databricks.spark.avro").load(path)`

This allows external data source packages to register themselves using a Service Loader so
that they can add custom alias like:

`sqlContext.read.format("avro").load(path)`

This makes it so that using external data source packages uses the same format as the internal
data sources like parquet, json, etc.

Author: Joseph Batchik <joseph.batchik@cloudera.com>
Author: Joseph Batchik <josephbatchik@gmail.com>

Closes #7802 from JDrit/service_loader and squashes the following commits:

49a01ec [Joseph Batchik] fixed a couple of format / error bugs
e5e93b2 [Joseph Batchik] modified rat file to only excluded added services
72b349a [Joseph Batchik] fixed error with orc data source actually
9f93ea7 [Joseph Batchik] fixed error with orc data source
87b7f1c [Joseph Batchik] fixed typo
101cd22 [Joseph Batchik] removing unneeded changes
8f3cf43 [Joseph Batchik] merged in changes
b63d337 [Joseph Batchik] merged in master
95ae030 [Joseph Batchik] changed the new trait to be used as a mixin for data source to register
themselves
74db85e [Joseph Batchik] reformatted class loader
ac2270d [Joseph Batchik] removing some added test
a6926db [Joseph Batchik] added test cases for data source loader
208a2a8 [Joseph Batchik] changes to do error catching if there are multiple data sources
946186e [Joseph Batchik] started working on service loader


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a3aec918
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a3aec918
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a3aec918

Branch: refs/heads/master
Commit: a3aec918bed22f8e33cf91dc0d6e712e6653c7d2
Parents: 23695f1
Author: Joseph Batchik <joseph.batchik@cloudera.com>
Authored: Sat Aug 8 11:03:01 2015 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Sat Aug 8 11:03:01 2015 -0700

----------------------------------------------------------------------
 .rat-excludes                                   |  1 +
 ....apache.spark.sql.sources.DataSourceRegister |  3 +
 .../spark/sql/execution/datasources/ddl.scala   | 52 ++++++------
 .../apache/spark/sql/jdbc/JDBCRelation.scala    |  5 +-
 .../apache/spark/sql/json/JSONRelation.scala    |  5 +-
 .../spark/sql/parquet/ParquetRelation.scala     |  5 +-
 .../apache/spark/sql/sources/interfaces.scala   | 21 +++++
 ....apache.spark.sql.sources.DataSourceRegister |  3 +
 .../spark/sql/sources/DDLSourceLoadSuite.scala  | 85 ++++++++++++++++++++
 ....apache.spark.sql.sources.DataSourceRegister |  1 +
 .../apache/spark/sql/hive/orc/OrcRelation.scala |  5 +-
 11 files changed, 156 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a3aec918/.rat-excludes
----------------------------------------------------------------------
diff --git a/.rat-excludes b/.rat-excludes
index 236c2db..7277146 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -93,3 +93,4 @@ INDEX
 .lintr
 gen-java.*
 .*avpr
+org.apache.spark.sql.sources.DataSourceRegister

http://git-wip-us.apache.org/repos/asf/spark/blob/a3aec918/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
----------------------------------------------------------------------
diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
new file mode 100644
index 0000000..cc32d4b
--- /dev/null
+++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -0,0 +1,3 @@
+org.apache.spark.sql.jdbc.DefaultSource
+org.apache.spark.sql.json.DefaultSource
+org.apache.spark.sql.parquet.DefaultSource

http://git-wip-us.apache.org/repos/asf/spark/blob/a3aec918/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 0cdb407..8c2f297 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -17,7 +17,12 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import java.util.ServiceLoader
+
+import scala.collection.Iterator
+import scala.collection.JavaConversions._
 import scala.language.{existentials, implicitConversions}
+import scala.util.{Failure, Success, Try}
 import scala.util.matching.Regex
 
 import org.apache.hadoop.fs.Path
@@ -190,37 +195,32 @@ private[sql] class DDLParser(
     }
 }
 
-private[sql] object ResolvedDataSource {
-
-  private val builtinSources = Map(
-    "jdbc" -> "org.apache.spark.sql.jdbc.DefaultSource",
-    "json" -> "org.apache.spark.sql.json.DefaultSource",
-    "parquet" -> "org.apache.spark.sql.parquet.DefaultSource",
-    "orc" -> "org.apache.spark.sql.hive.orc.DefaultSource"
-  )
+private[sql] object ResolvedDataSource extends Logging {
 
   /** Given a provider name, look up the data source class definition. */
   def lookupDataSource(provider: String): Class[_] = {
+    val provider2 = s"$provider.DefaultSource"
     val loader = Utils.getContextOrSparkClassLoader
-
-    if (builtinSources.contains(provider)) {
-      return loader.loadClass(builtinSources(provider))
-    }
-
-    try {
-      loader.loadClass(provider)
-    } catch {
-      case cnf: java.lang.ClassNotFoundException =>
-        try {
-          loader.loadClass(provider + ".DefaultSource")
-        } catch {
-          case cnf: java.lang.ClassNotFoundException =>
-            if (provider.startsWith("org.apache.spark.sql.hive.orc")) {
-              sys.error("The ORC data source must be used with Hive support enabled.")
-            } else {
-              sys.error(s"Failed to load class for data source: $provider")
-            }
+    val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
+
+    serviceLoader.iterator().filter(_.format().equalsIgnoreCase(provider)).toList match {
+      /** the provider format did not match any given registered aliases */
+      case Nil => Try(loader.loadClass(provider)).orElse(Try(loader.loadClass(provider2)))
match {
+        case Success(dataSource) => dataSource
+        case Failure(error) => if (provider.startsWith("org.apache.spark.sql.hive.orc"))
{
+          throw new ClassNotFoundException(
+            "The ORC data source must be used with Hive support enabled.", error)
+        } else {
+          throw new ClassNotFoundException(
+            s"Failed to load class for data source: $provider", error)
         }
+      }
+      /** there is exactly one registered alias */
+      case head :: Nil => head.getClass
+      /** There are multiple registered aliases for the input */
+      case sources => sys.error(s"Multiple sources found for $provider, " +
+        s"(${sources.map(_.getClass.getName).mkString(", ")}), " +
+        "please specify the fully qualified class name")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a3aec918/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
index 41d0ecb..48d97ce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
@@ -77,7 +77,10 @@ private[sql] object JDBCRelation {
   }
 }
 
-private[sql] class DefaultSource extends RelationProvider {
+private[sql] class DefaultSource extends RelationProvider with DataSourceRegister {
+
+  def format(): String = "jdbc"
+
   /** Returns a new base relation with the given parameters. */
   override def createRelation(
       sqlContext: SQLContext,

http://git-wip-us.apache.org/repos/asf/spark/blob/a3aec918/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
index 10f1367..b34a272 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
@@ -37,7 +37,10 @@ import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
 
-private[sql] class DefaultSource extends HadoopFsRelationProvider {
+private[sql] class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister
{
+
+  def format(): String = "json"
+
   override def createRelation(
       sqlContext: SQLContext,
       paths: Array[String],

http://git-wip-us.apache.org/repos/asf/spark/blob/a3aec918/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index 48009b2..b6db71b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -49,7 +49,10 @@ import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.util.{SerializableConfiguration, Utils}
 
 
-private[sql] class DefaultSource extends HadoopFsRelationProvider {
+private[sql] class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister
{
+
+  def format(): String = "parquet"
+
   override def createRelation(
       sqlContext: SQLContext,
       paths: Array[String],

http://git-wip-us.apache.org/repos/asf/spark/blob/a3aec918/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index c5b7ee7..4aafec0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -39,6 +39,27 @@ import org.apache.spark.util.SerializableConfiguration
 
 /**
  * ::DeveloperApi::
+ * Data sources should implement this trait so that they can register an alias to their data
source.
+ * This allows users to give the data source alias as the format type over the fully qualified
+ * class name.
+ *
+ * ex: parquet.DefaultSource.format = "parquet".
+ *
+ * A new instance of this class with be instantiated each time a DDL call is made.
+ */
+@DeveloperApi
+trait DataSourceRegister {
+
+  /**
+   * The string that represents the format that this data source provider uses. This is
+   * overridden by children to provide a nice alias for the data source,
+   * ex: override def format(): String = "parquet"
+   */
+  def format(): String
+}
+
+/**
+ * ::DeveloperApi::
  * Implemented by objects that produce relations for a specific kind of data source.  When
  * Spark SQL is given a DDL operation with a USING clause specified (to specify the implemented
  * RelationProvider), this interface is used to pass in the parameters specified by a user.

http://git-wip-us.apache.org/repos/asf/spark/blob/a3aec918/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
new file mode 100644
index 0000000..cfd7889
--- /dev/null
+++ b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -0,0 +1,3 @@
+org.apache.spark.sql.sources.FakeSourceOne
+org.apache.spark.sql.sources.FakeSourceTwo
+org.apache.spark.sql.sources.FakeSourceThree

http://git-wip-us.apache.org/repos/asf/spark/blob/a3aec918/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
new file mode 100644
index 0000000..1a4d41b
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
@@ -0,0 +1,85 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.sources
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+class FakeSourceOne extends RelationProvider with DataSourceRegister {
+
+  def format(): String = "Fluet da Bomb"
+
+  override def createRelation(cont: SQLContext, param: Map[String, String]): BaseRelation
=
+    new BaseRelation {
+      override def sqlContext: SQLContext = cont
+
+      override def schema: StructType =
+        StructType(Seq(StructField("stringType", StringType, nullable = false)))
+    }
+}
+
+class FakeSourceTwo extends RelationProvider  with DataSourceRegister {
+
+  def format(): String = "Fluet da Bomb"
+
+  override def createRelation(cont: SQLContext, param: Map[String, String]): BaseRelation
=
+    new BaseRelation {
+      override def sqlContext: SQLContext = cont
+
+      override def schema: StructType =
+        StructType(Seq(StructField("stringType", StringType, nullable = false)))
+    }
+}
+
+class FakeSourceThree extends RelationProvider with DataSourceRegister {
+
+  def format(): String = "gathering quorum"
+
+  override def createRelation(cont: SQLContext, param: Map[String, String]): BaseRelation
=
+    new BaseRelation {
+      override def sqlContext: SQLContext = cont
+
+      override def schema: StructType =
+        StructType(Seq(StructField("stringType", StringType, nullable = false)))
+    }
+}
+// please note that the META-INF/services had to be modified for the test directory for this
to work
+class DDLSourceLoadSuite extends DataSourceTest {
+
+  test("data sources with the same name") {
+    intercept[RuntimeException] {
+      caseInsensitiveContext.read.format("Fluet da Bomb").load()
+    }
+  }
+
+  test("load data source from format alias") {
+    caseInsensitiveContext.read.format("gathering quorum").load().schema ==
+      StructType(Seq(StructField("stringType", StringType, nullable = false)))
+  }
+
+  test("specify full classname with duplicate formats") {
+    caseInsensitiveContext.read.format("org.apache.spark.sql.sources.FakeSourceOne")
+      .load().schema == StructType(Seq(StructField("stringType", StringType, nullable = false)))
+  }
+
+  test("Loading Orc") {
+    intercept[ClassNotFoundException] {
+      caseInsensitiveContext.read.format("orc").load()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a3aec918/sql/hive/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
b/sql/hive/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
new file mode 100644
index 0000000..4a774fb
--- /dev/null
+++ b/sql/hive/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -0,0 +1 @@
+org.apache.spark.sql.hive.orc.DefaultSource

http://git-wip-us.apache.org/repos/asf/spark/blob/a3aec918/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 7c8704b..0c344c6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -47,7 +47,10 @@ import org.apache.spark.util.SerializableConfiguration
 /* Implicit conversions */
 import scala.collection.JavaConversions._
 
-private[sql] class DefaultSource extends HadoopFsRelationProvider {
+private[sql] class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister
{
+
+  def format(): String = "orc"
+
   def createRelation(
       sqlContext: SQLContext,
       paths: Array[String],


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message