spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [spark] brkyvz commented on a change in pull request #26913: [SPARK-29219][SQL] Introduce SupportsCatalogOptions for TableProvider
Date Mon, 23 Dec 2019 19:04:04 GMT
brkyvz commented on a change in pull request #26913: [SPARK-29219][SQL] Introduce SupportsCatalogOptions
for TableProvider
URL: https://github.com/apache/spark/pull/26913#discussion_r360976390
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
 ##########
 @@ -258,37 +258,73 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
       val dsOptions = new CaseInsensitiveStringMap(options.asJava)
 
       import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
-      provider.getTable(dsOptions) match {
-        case table: SupportsWrite if table.supports(BATCH_WRITE) =>
-          if (partitioningColumns.nonEmpty) {
-            throw new AnalysisException("Cannot write data to TableProvider implementation
" +
-              "if partition columns are specified.")
+      mode match {
+        case SaveMode.Append | SaveMode.Overwrite =>
+          val table = provider match {
+            case supportsExtract: SupportsCatalogOptions =>
+              val ident = supportsExtract.extractIdentifier(dsOptions)
+              val sessionState = df.sparkSession.sessionState
+              val catalog = CatalogV2Util.getTableProviderCatalog(
+                supportsExtract, sessionState.catalogManager, dsOptions)
+
+              catalog.loadTable(ident)
+            case tableProvider: TableProvider => tableProvider.getTable(dsOptions)
+            case _ =>
+              // Streaming also uses the data source V2 API. So it may be that the data source
+              // implements v2, but has no v2 implementation for batch writes. In that case,
we fall
+              // back to saving as though it's a V1 source.
+              return saveToV1Source()
+          }
+
+          val relation = DataSourceV2Relation.create(table, dsOptions)
+          checkPartitioningMatchesV2Table(table)
+          if (mode == SaveMode.Append) {
+            runCommand(df.sparkSession, "save") {
+              AppendData.byName(relation, df.logicalPlan, extraOptions.toMap)
+            }
+          } else {
+            // Truncate the table. TableCapabilityCheck will throw a nice exception if this
+            // isn't supported
+            runCommand(df.sparkSession, "save") {
+              OverwriteByExpression.byName(
+                relation, df.logicalPlan, Literal(true), extraOptions.toMap)
+            }
           }
-          lazy val relation = DataSourceV2Relation.create(table, dsOptions)
-          mode match {
-            case SaveMode.Append =>
-              runCommand(df.sparkSession, "save") {
-                AppendData.byName(relation, df.logicalPlan, extraOptions.toMap)
-              }
 
-            case SaveMode.Overwrite if table.supportsAny(TRUNCATE, OVERWRITE_BY_FILTER) =>
-              // truncate the table
+        case create =>
 
 Review comment:
   it's not unused. It's used in the Ignore check below

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message