spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject [2/2] spark git commit: [SPARK-23260][SPARK-23262][SQL] several data source v2 naming cleanup
Date Tue, 30 Jan 2018 11:43:26 GMT
[SPARK-23260][SPARK-23262][SQL] several data source v2 naming cleanup

## What changes were proposed in this pull request?

All other classes in the reader/writer package doesn't have `V2` in their names, and the streaming reader/writer don't have `V2` either. It's more consistent to remove `V2` from `DataSourceV2Reader` and `DataSourceVWriter`.

Also rename `DataSourceV2Option` to remote the `V2`, we should only have `V2` in the root interface: `DataSourceV2`.

This PR also fixes some places that the mix-in interface doesn't extend the interface it aimed to mix in.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #20427 from cloud-fan/ds-v2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a9ac024
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a9ac024
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a9ac024

Branch: refs/heads/master
Commit: 0a9ac0248b6514a1e83ff7e4c522424f01b8b78d
Parents: 5056877
Author: Wenchen Fan <wenchen@databricks.com>
Authored: Tue Jan 30 19:43:17 2018 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Tue Jan 30 19:43:17 2018 +0800

----------------------------------------------------------------------
 .../sql/kafka010/KafkaContinuousReader.scala    |   2 +-
 .../sql/kafka010/KafkaSourceProvider.scala      |   6 +-
 .../spark/sql/sources/v2/DataSourceOptions.java | 100 +++++++++++++++++++
 .../sql/sources/v2/DataSourceV2Options.java     | 100 -------------------
 .../spark/sql/sources/v2/ReadSupport.java       |   8 +-
 .../sql/sources/v2/ReadSupportWithSchema.java   |   8 +-
 .../sql/sources/v2/SessionConfigSupport.java    |   2 +-
 .../spark/sql/sources/v2/WriteSupport.java      |  12 +--
 .../sources/v2/reader/DataReaderFactory.java    |   2 +-
 .../sql/sources/v2/reader/DataSourceReader.java |  80 +++++++++++++++
 .../sources/v2/reader/DataSourceV2Reader.java   |  79 ---------------
 .../reader/SupportsPushDownCatalystFilters.java |   4 +-
 .../v2/reader/SupportsPushDownFilters.java      |   4 +-
 .../reader/SupportsPushDownRequiredColumns.java |   6 +-
 .../v2/reader/SupportsReportPartitioning.java   |   4 +-
 .../v2/reader/SupportsReportStatistics.java     |   4 +-
 .../v2/reader/SupportsScanColumnarBatch.java    |   6 +-
 .../v2/reader/SupportsScanUnsafeRow.java        |   6 +-
 .../v2/streaming/ContinuousReadSupport.java     |   4 +-
 .../v2/streaming/MicroBatchReadSupport.java     |   4 +-
 .../v2/streaming/StreamWriteSupport.java        |  10 +-
 .../v2/streaming/reader/ContinuousReader.java   |   6 +-
 .../v2/streaming/reader/MicroBatchReader.java   |   6 +-
 .../v2/streaming/writer/StreamWriter.java       |   6 +-
 .../sources/v2/writer/DataSourceV2Writer.java   |  94 -----------------
 .../sql/sources/v2/writer/DataSourceWriter.java |  94 +++++++++++++++++
 .../spark/sql/sources/v2/writer/DataWriter.java |  12 +--
 .../sources/v2/writer/DataWriterFactory.java    |   2 +-
 .../v2/writer/SupportsWriteInternalRow.java     |   4 +-
 .../sources/v2/writer/WriterCommitMessage.java  |   4 +-
 .../org/apache/spark/sql/DataFrameReader.scala  |   2 +-
 .../org/apache/spark/sql/DataFrameWriter.scala  |   2 +-
 .../datasources/v2/DataSourceReaderHolder.scala |   2 +-
 .../datasources/v2/DataSourceV2Relation.scala   |   6 +-
 .../datasources/v2/DataSourceV2ScanExec.scala   |   2 +-
 .../datasources/v2/WriteToDataSourceV2.scala    |   4 +-
 .../streaming/MicroBatchExecution.scala         |   6 +-
 .../streaming/RateSourceProvider.scala          |   2 +-
 .../spark/sql/execution/streaming/console.scala |   4 +-
 .../continuous/ContinuousExecution.scala        |   6 +-
 .../continuous/ContinuousRateStreamSource.scala |   7 +-
 .../streaming/sources/ConsoleWriter.scala       |   4 +-
 .../streaming/sources/MicroBatchWriter.scala    |   8 +-
 .../sources/PackedRowWriterFactory.scala        |   4 +-
 .../streaming/sources/RateStreamSourceV2.scala  |   6 +-
 .../execution/streaming/sources/memoryV2.scala  |   6 +-
 .../spark/sql/streaming/DataStreamReader.scala  |   4 +-
 .../sources/v2/JavaAdvancedDataSourceV2.java    |   6 +-
 .../sql/sources/v2/JavaBatchDataSourceV2.java   |   6 +-
 .../v2/JavaPartitionAwareDataSource.java        |   6 +-
 .../v2/JavaSchemaRequiredDataSource.java        |   8 +-
 .../sql/sources/v2/JavaSimpleDataSourceV2.java  |   8 +-
 .../sources/v2/JavaUnsafeRowDataSourceV2.java   |   6 +-
 .../execution/streaming/RateSourceV2Suite.scala |  18 ++--
 .../sql/sources/v2/DataSourceOptionsSuite.scala |  82 +++++++++++++++
 .../sources/v2/DataSourceV2OptionsSuite.scala   |  82 ---------------
 .../sql/sources/v2/DataSourceV2Suite.scala      |  24 ++---
 .../sources/v2/SimpleWritableDataSource.scala   |  12 +--
 .../sources/StreamingDataSourceV2Suite.scala    |   8 +-
 59 files changed, 510 insertions(+), 510 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
index 9125cf5..8c73342 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
@@ -41,7 +41,7 @@ import org.apache.spark.unsafe.types.UTF8String
  * @param offsetReader  a reader used to get kafka offsets. Note that the actual data will be
  *                      read by per-task consumers generated later.
  * @param kafkaParams   String params for per-task Kafka consumers.
- * @param sourceOptions The [[org.apache.spark.sql.sources.v2.DataSourceV2Options]] params which
+ * @param sourceOptions The [[org.apache.spark.sql.sources.v2.DataSourceOptions]] params which
  *                      are not Kafka consumer params.
  * @param metadataPath Path to a directory this reader can use for writing metadata.
  * @param initialOffsets The Kafka offsets to start reading data at.

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 2deb7fa..85e96b6 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -30,7 +30,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession, SQLContext}
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
 import org.apache.spark.sql.sources._
-import org.apache.spark.sql.sources.v2.DataSourceV2Options
+import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, StreamWriteSupport}
 import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
 import org.apache.spark.sql.streaming.OutputMode
@@ -109,7 +109,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
   override def createContinuousReader(
       schema: Optional[StructType],
       metadataPath: String,
-      options: DataSourceV2Options): KafkaContinuousReader = {
+      options: DataSourceOptions): KafkaContinuousReader = {
     val parameters = options.asMap().asScala.toMap
     validateStreamOptions(parameters)
     // Each running query should use its own group id. Otherwise, the query may be only assigned
@@ -227,7 +227,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
       queryId: String,
       schema: StructType,
       mode: OutputMode,
-      options: DataSourceV2Options): StreamWriter = {
+      options: DataSourceOptions): StreamWriter = {
     import scala.collection.JavaConverters._
 
     val spark = SparkSession.getActiveSession.get

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java
new file mode 100644
index 0000000..c320535
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * An immutable string-to-string map in which keys are case-insensitive. This is used to represent
+ * data source options.
+ */
+@InterfaceStability.Evolving
+public class DataSourceOptions {
+  private final Map<String, String> keyLowerCasedMap;
+
+  private String toLowerCase(String key) {
+    return key.toLowerCase(Locale.ROOT);
+  }
+
+  public static DataSourceOptions empty() {
+    return new DataSourceOptions(new HashMap<>());
+  }
+
+  public DataSourceOptions(Map<String, String> originalMap) {
+    keyLowerCasedMap = new HashMap<>(originalMap.size());
+    for (Map.Entry<String, String> entry : originalMap.entrySet()) {
+      keyLowerCasedMap.put(toLowerCase(entry.getKey()), entry.getValue());
+    }
+  }
+
+  public Map<String, String> asMap() {
+    return new HashMap<>(keyLowerCasedMap);
+  }
+
+  /**
+   * Returns the option value to which the specified key is mapped, case-insensitively.
+   */
+  public Optional<String> get(String key) {
+    return Optional.ofNullable(keyLowerCasedMap.get(toLowerCase(key)));
+  }
+
+  /**
+   * Returns the boolean value to which the specified key is mapped,
+   * or defaultValue if there is no mapping for the key. The key match is case-insensitive
+   */
+  public boolean getBoolean(String key, boolean defaultValue) {
+    String lcaseKey = toLowerCase(key);
+    return keyLowerCasedMap.containsKey(lcaseKey) ?
+      Boolean.parseBoolean(keyLowerCasedMap.get(lcaseKey)) : defaultValue;
+  }
+
+  /**
+   * Returns the integer value to which the specified key is mapped,
+   * or defaultValue if there is no mapping for the key. The key match is case-insensitive
+   */
+  public int getInt(String key, int defaultValue) {
+    String lcaseKey = toLowerCase(key);
+    return keyLowerCasedMap.containsKey(lcaseKey) ?
+      Integer.parseInt(keyLowerCasedMap.get(lcaseKey)) : defaultValue;
+  }
+
+  /**
+   * Returns the long value to which the specified key is mapped,
+   * or defaultValue if there is no mapping for the key. The key match is case-insensitive
+   */
+  public long getLong(String key, long defaultValue) {
+    String lcaseKey = toLowerCase(key);
+    return keyLowerCasedMap.containsKey(lcaseKey) ?
+      Long.parseLong(keyLowerCasedMap.get(lcaseKey)) : defaultValue;
+  }
+
+  /**
+   * Returns the double value to which the specified key is mapped,
+   * or defaultValue if there is no mapping for the key. The key match is case-insensitive
+   */
+  public double getDouble(String key, double defaultValue) {
+    String lcaseKey = toLowerCase(key);
+    return keyLowerCasedMap.containsKey(lcaseKey) ?
+      Double.parseDouble(keyLowerCasedMap.get(lcaseKey)) : defaultValue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java
deleted file mode 100644
index ddc2acc..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import java.util.HashMap;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Optional;
-
-import org.apache.spark.annotation.InterfaceStability;
-
-/**
- * An immutable string-to-string map in which keys are case-insensitive. This is used to represent
- * data source options.
- */
-@InterfaceStability.Evolving
-public class DataSourceV2Options {
-  private final Map<String, String> keyLowerCasedMap;
-
-  private String toLowerCase(String key) {
-    return key.toLowerCase(Locale.ROOT);
-  }
-
-  public static DataSourceV2Options empty() {
-    return new DataSourceV2Options(new HashMap<>());
-  }
-
-  public DataSourceV2Options(Map<String, String> originalMap) {
-    keyLowerCasedMap = new HashMap<>(originalMap.size());
-    for (Map.Entry<String, String> entry : originalMap.entrySet()) {
-      keyLowerCasedMap.put(toLowerCase(entry.getKey()), entry.getValue());
-    }
-  }
-
-  public Map<String, String> asMap() {
-    return new HashMap<>(keyLowerCasedMap);
-  }
-
-  /**
-   * Returns the option value to which the specified key is mapped, case-insensitively.
-   */
-  public Optional<String> get(String key) {
-    return Optional.ofNullable(keyLowerCasedMap.get(toLowerCase(key)));
-  }
-
-  /**
-   * Returns the boolean value to which the specified key is mapped,
-   * or defaultValue if there is no mapping for the key. The key match is case-insensitive
-   */
-  public boolean getBoolean(String key, boolean defaultValue) {
-    String lcaseKey = toLowerCase(key);
-    return keyLowerCasedMap.containsKey(lcaseKey) ?
-      Boolean.parseBoolean(keyLowerCasedMap.get(lcaseKey)) : defaultValue;
-  }
-
-  /**
-   * Returns the integer value to which the specified key is mapped,
-   * or defaultValue if there is no mapping for the key. The key match is case-insensitive
-   */
-  public int getInt(String key, int defaultValue) {
-    String lcaseKey = toLowerCase(key);
-    return keyLowerCasedMap.containsKey(lcaseKey) ?
-      Integer.parseInt(keyLowerCasedMap.get(lcaseKey)) : defaultValue;
-  }
-
-  /**
-   * Returns the long value to which the specified key is mapped,
-   * or defaultValue if there is no mapping for the key. The key match is case-insensitive
-   */
-  public long getLong(String key, long defaultValue) {
-    String lcaseKey = toLowerCase(key);
-    return keyLowerCasedMap.containsKey(lcaseKey) ?
-      Long.parseLong(keyLowerCasedMap.get(lcaseKey)) : defaultValue;
-  }
-
-  /**
-   * Returns the double value to which the specified key is mapped,
-   * or defaultValue if there is no mapping for the key. The key match is case-insensitive
-   */
-  public double getDouble(String key, double defaultValue) {
-    String lcaseKey = toLowerCase(key);
-    return keyLowerCasedMap.containsKey(lcaseKey) ?
-      Double.parseDouble(keyLowerCasedMap.get(lcaseKey)) : defaultValue;
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java
index 948e20b..0ea4dc6 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java
@@ -18,17 +18,17 @@
 package org.apache.spark.sql.sources.v2;
 
 import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 
 /**
  * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
  * provide data reading ability and scan the data from the data source.
  */
 @InterfaceStability.Evolving
-public interface ReadSupport {
+public interface ReadSupport extends DataSourceV2 {
 
   /**
-   * Creates a {@link DataSourceV2Reader} to scan the data from this data source.
+   * Creates a {@link DataSourceReader} to scan the data from this data source.
    *
    * If this method fails (by throwing an exception), the action would fail and no Spark job was
    * submitted.
@@ -36,5 +36,5 @@ public interface ReadSupport {
    * @param options the options for the returned data source reader, which is an immutable
    *                case-insensitive string-to-string map.
    */
-  DataSourceV2Reader createReader(DataSourceV2Options options);
+  DataSourceReader createReader(DataSourceOptions options);
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java
index b69c6be..3801402 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupportWithSchema.java
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.sources.v2;
 
 import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 import org.apache.spark.sql.types.StructType;
 
 /**
@@ -30,10 +30,10 @@ import org.apache.spark.sql.types.StructType;
  * supports both schema inference and user-specified schema.
  */
 @InterfaceStability.Evolving
-public interface ReadSupportWithSchema {
+public interface ReadSupportWithSchema extends DataSourceV2 {
 
   /**
-   * Create a {@link DataSourceV2Reader} to scan the data from this data source.
+   * Create a {@link DataSourceReader} to scan the data from this data source.
    *
    * If this method fails (by throwing an exception), the action would fail and no Spark job was
    * submitted.
@@ -45,5 +45,5 @@ public interface ReadSupportWithSchema {
    * @param options the options for the returned data source reader, which is an immutable
    *                case-insensitive string-to-string map.
    */
-  DataSourceV2Reader createReader(StructType schema, DataSourceV2Options options);
+  DataSourceReader createReader(StructType schema, DataSourceOptions options);
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java
index 3cb020d..9d66805 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java
@@ -25,7 +25,7 @@ import org.apache.spark.annotation.InterfaceStability;
  * session.
  */
 @InterfaceStability.Evolving
-public interface SessionConfigSupport {
+public interface SessionConfigSupport extends DataSourceV2 {
 
     /**
      * Key prefix of the session configs to propagate. Spark will extract all session configs that

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java
index 1e3b644..cab5645 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java
@@ -21,7 +21,7 @@ import java.util.Optional;
 
 import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
 import org.apache.spark.sql.types.StructType;
 
 /**
@@ -29,17 +29,17 @@ import org.apache.spark.sql.types.StructType;
  * provide data writing ability and save the data to the data source.
  */
 @InterfaceStability.Evolving
-public interface WriteSupport {
+public interface WriteSupport extends DataSourceV2 {
 
   /**
-   * Creates an optional {@link DataSourceV2Writer} to save the data to this data source. Data
+   * Creates an optional {@link DataSourceWriter} to save the data to this data source. Data
    * sources can return None if there is no writing needed to be done according to the save mode.
    *
    * If this method fails (by throwing an exception), the action would fail and no Spark job was
    * submitted.
    *
    * @param jobId A unique string for the writing job. It's possible that there are many writing
-   *              jobs running at the same time, and the returned {@link DataSourceV2Writer} can
+   *              jobs running at the same time, and the returned {@link DataSourceWriter} can
    *              use this job id to distinguish itself from other jobs.
    * @param schema the schema of the data to be written.
    * @param mode the save mode which determines what to do when the data are already in this data
@@ -47,6 +47,6 @@ public interface WriteSupport {
    * @param options the options for the returned data source writer, which is an immutable
    *                case-insensitive string-to-string map.
    */
-  Optional<DataSourceV2Writer> createWriter(
-      String jobId, StructType schema, SaveMode mode, DataSourceV2Options options);
+  Optional<DataSourceWriter> createWriter(
+      String jobId, StructType schema, SaveMode mode, DataSourceOptions options);
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReaderFactory.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReaderFactory.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReaderFactory.java
index 077b95b..32e98e8 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReaderFactory.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReaderFactory.java
@@ -22,7 +22,7 @@ import java.io.Serializable;
 import org.apache.spark.annotation.InterfaceStability;
 
 /**
- * A reader factory returned by {@link DataSourceV2Reader#createDataReaderFactories()} and is
+ * A reader factory returned by {@link DataSourceReader#createDataReaderFactories()} and is
  * responsible for creating the actual data reader. The relationship between
  * {@link DataReaderFactory} and {@link DataReader}
  * is similar to the relationship between {@link Iterable} and {@link java.util.Iterator}.

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java
new file mode 100644
index 0000000..a470bcc
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.util.List;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.ReadSupportWithSchema;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A data source reader that is returned by
+ * {@link ReadSupport#createReader(DataSourceOptions)} or
+ * {@link ReadSupportWithSchema#createReader(StructType, DataSourceOptions)}.
+ * It can mix in various query optimization interfaces to speed up the data scan. The actual scan
+ * logic is delegated to {@link DataReaderFactory}s that are returned by
+ * {@link #createDataReaderFactories()}.
+ *
+ * There are mainly 3 kinds of query optimizations:
+ *   1. Operators push-down. E.g., filter push-down, required columns push-down(aka column
+ *      pruning), etc. Names of these interfaces start with `SupportsPushDown`.
+ *   2. Information Reporting. E.g., statistics reporting, ordering reporting, etc.
+ *      Names of these interfaces start with `SupportsReporting`.
+ *   3. Special scans. E.g, columnar scan, unsafe row scan, etc.
+ *      Names of these interfaces start with `SupportsScan`. Note that a reader should only
+ *      implement at most one of the special scans, if more than one special scans are implemented,
+ *      only one of them would be respected, according to the priority list from high to low:
+ *      {@link SupportsScanColumnarBatch}, {@link SupportsScanUnsafeRow}.
+ *
+ * If an exception was throw when applying any of these query optimizations, the action would fail
+ * and no Spark job was submitted.
+ *
+ * Spark first applies all operator push-down optimizations that this data source supports. Then
+ * Spark collects information this data source reported for further optimizations. Finally Spark
+ * issues the scan request and does the actual data reading.
+ */
+@InterfaceStability.Evolving
+public interface DataSourceReader {
+
+  /**
+   * Returns the actual schema of this data source reader, which may be different from the physical
+   * schema of the underlying storage, as column pruning or other optimizations may happen.
+   *
+   * If this method fails (by throwing an exception), the action would fail and no Spark job was
+   * submitted.
+   */
+  StructType readSchema();
+
+  /**
+   * Returns a list of reader factories. Each factory is responsible for creating a data reader to
+   * output data for one RDD partition. That means the number of factories returned here is same as
+   * the number of RDD partitions this scan outputs.
+   *
+   * Note that, this may not be a full scan if the data source reader mixes in other optimization
+   * interfaces like column pruning, filter push-down, etc. These optimizations are applied before
+   * Spark issues the scan request.
+   *
+   * If this method fails (by throwing an exception), the action would fail and no Spark job was
+   * submitted.
+   */
+  List<DataReaderFactory<Row>> createDataReaderFactories();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java
deleted file mode 100644
index 0180cd9..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import java.util.List;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A data source reader that is returned by
- * {@link org.apache.spark.sql.sources.v2.ReadSupport#createReader(
- * org.apache.spark.sql.sources.v2.DataSourceV2Options)} or
- * {@link org.apache.spark.sql.sources.v2.ReadSupportWithSchema#createReader(
- * StructType, org.apache.spark.sql.sources.v2.DataSourceV2Options)}.
- * It can mix in various query optimization interfaces to speed up the data scan. The actual scan
- * logic is delegated to {@link DataReaderFactory}s that are returned by
- * {@link #createDataReaderFactories()}.
- *
- * There are mainly 3 kinds of query optimizations:
- *   1. Operators push-down. E.g., filter push-down, required columns push-down(aka column
- *      pruning), etc. Names of these interfaces start with `SupportsPushDown`.
- *   2. Information Reporting. E.g., statistics reporting, ordering reporting, etc.
- *      Names of these interfaces start with `SupportsReporting`.
- *   3. Special scans. E.g, columnar scan, unsafe row scan, etc.
- *      Names of these interfaces start with `SupportsScan`. Note that a reader should only
- *      implement at most one of the special scans, if more than one special scans are implemented,
- *      only one of them would be respected, according to the priority list from high to low:
- *      {@link SupportsScanColumnarBatch}, {@link SupportsScanUnsafeRow}.
- *
- * If an exception was throw when applying any of these query optimizations, the action would fail
- * and no Spark job was submitted.
- *
- * Spark first applies all operator push-down optimizations that this data source supports. Then
- * Spark collects information this data source reported for further optimizations. Finally Spark
- * issues the scan request and does the actual data reading.
- */
-@InterfaceStability.Evolving
-public interface DataSourceV2Reader {
-
-  /**
-   * Returns the actual schema of this data source reader, which may be different from the physical
-   * schema of the underlying storage, as column pruning or other optimizations may happen.
-   *
-   * If this method fails (by throwing an exception), the action would fail and no Spark job was
-   * submitted.
-   */
-  StructType readSchema();
-
-  /**
-   * Returns a list of reader factories. Each factory is responsible for creating a data reader to
-   * output data for one RDD partition. That means the number of factories returned here is same as
-   * the number of RDD partitions this scan outputs.
-   *
-   * Note that, this may not be a full scan if the data source reader mixes in other optimization
-   * interfaces like column pruning, filter push-down, etc. These optimizations are applied before
-   * Spark issues the scan request.
-   *
-   * If this method fails (by throwing an exception), the action would fail and no Spark job was
-   * submitted.
-   */
-  List<DataReaderFactory<Row>> createDataReaderFactories();
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java
index f76c687..9822410 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java
@@ -21,7 +21,7 @@ import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.catalyst.expressions.Expression;
 
 /**
- * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this
+ * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this
  * interface to push down arbitrary expressions as predicates to the data source.
  * This is an experimental and unstable interface as {@link Expression} is not public and may get
  * changed in the future Spark versions.
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression;
  * process this interface.
  */
 @InterfaceStability.Unstable
-public interface SupportsPushDownCatalystFilters {
+public interface SupportsPushDownCatalystFilters extends DataSourceReader {
 
   /**
    * Pushes down filters, and returns unsupported filters.

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
index 6b0c9d4..f35c711 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
@@ -21,7 +21,7 @@ import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.sources.Filter;
 
 /**
- * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this
+ * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this
  * interface to push down filters to the data source and reduce the size of the data to be read.
  *
  * Note that, if data source readers implement both this interface and
@@ -29,7 +29,7 @@ import org.apache.spark.sql.sources.Filter;
  * {@link SupportsPushDownCatalystFilters}.
  */
 @InterfaceStability.Evolving
-public interface SupportsPushDownFilters {
+public interface SupportsPushDownFilters extends DataSourceReader {
 
   /**
    * Pushes down filters, and returns unsupported filters.

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java
index fe0ac8e..427b4d0 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java
@@ -21,12 +21,12 @@ import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.types.StructType;
 
 /**
- * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this
+ * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this
  * interface to push down required columns to the data source and only read these columns during
  * scan to reduce the size of the data to be read.
  */
 @InterfaceStability.Evolving
-public interface SupportsPushDownRequiredColumns {
+public interface SupportsPushDownRequiredColumns extends DataSourceReader {
 
   /**
    * Applies column pruning w.r.t. the given requiredSchema.
@@ -35,7 +35,7 @@ public interface SupportsPushDownRequiredColumns {
    * also OK to do the pruning partially, e.g., a data source may not be able to prune nested
    * fields, and only prune top-level columns.
    *
-   * Note that, data source readers should update {@link DataSourceV2Reader#readSchema()} after
+   * Note that, data source readers should update {@link DataSourceReader#readSchema()} after
    * applying column pruning.
    */
   void pruneColumns(StructType requiredSchema);

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
index f786472..a2383a9 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
@@ -20,11 +20,11 @@ package org.apache.spark.sql.sources.v2.reader;
 import org.apache.spark.annotation.InterfaceStability;
 
 /**
- * A mix in interface for {@link DataSourceV2Reader}. Data source readers can implement this
+ * A mix in interface for {@link DataSourceReader}. Data source readers can implement this
  * interface to report data partitioning and try to avoid shuffle at Spark side.
  */
 @InterfaceStability.Evolving
-public interface SupportsReportPartitioning {
+public interface SupportsReportPartitioning extends DataSourceReader {
 
   /**
    * Returns the output data partitioning that this reader guarantees.

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
index c019d2f..11bb13f 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
@@ -20,11 +20,11 @@ package org.apache.spark.sql.sources.v2.reader;
 import org.apache.spark.annotation.InterfaceStability;
 
 /**
- * A mix in interface for {@link DataSourceV2Reader}. Data source readers can implement this
+ * A mix in interface for {@link DataSourceReader}. Data source readers can implement this
  * interface to report statistics to Spark.
  */
 @InterfaceStability.Evolving
-public interface SupportsReportStatistics {
+public interface SupportsReportStatistics extends DataSourceReader {
 
   /**
    * Returns the basic statistics of this data source.

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
index 67da555..2e5cfa7 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
@@ -24,11 +24,11 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 
 /**
- * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this
+ * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this
  * interface to output {@link ColumnarBatch} and make the scan faster.
  */
 @InterfaceStability.Evolving
-public interface SupportsScanColumnarBatch extends DataSourceV2Reader {
+public interface SupportsScanColumnarBatch extends DataSourceReader {
   @Override
   default List<DataReaderFactory<Row>> createDataReaderFactories() {
     throw new IllegalStateException(
@@ -36,7 +36,7 @@ public interface SupportsScanColumnarBatch extends DataSourceV2Reader {
   }
 
   /**
-   * Similar to {@link DataSourceV2Reader#createDataReaderFactories()}, but returns columnar data
+   * Similar to {@link DataSourceReader#createDataReaderFactories()}, but returns columnar data
    * in batches.
    */
   List<DataReaderFactory<ColumnarBatch>> createBatchDataReaderFactories();

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java
index 156af69..9cd749e 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java
@@ -24,13 +24,13 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
 
 /**
- * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this
+ * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this
  * interface to output {@link UnsafeRow} directly and avoid the row copy at Spark side.
  * This is an experimental and unstable interface, as {@link UnsafeRow} is not public and may get
  * changed in the future Spark versions.
  */
 @InterfaceStability.Unstable
-public interface SupportsScanUnsafeRow extends DataSourceV2Reader {
+public interface SupportsScanUnsafeRow extends DataSourceReader {
 
   @Override
   default List<DataReaderFactory<Row>> createDataReaderFactories() {
@@ -39,7 +39,7 @@ public interface SupportsScanUnsafeRow extends DataSourceV2Reader {
   }
 
   /**
-   * Similar to {@link DataSourceV2Reader#createDataReaderFactories()},
+   * Similar to {@link DataSourceReader#createDataReaderFactories()},
    * but returns data in unsafe row format.
    */
   List<DataReaderFactory<UnsafeRow>> createUnsafeRowReaderFactories();

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java
index 9a93a80..f79424e 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java
@@ -21,7 +21,7 @@ import java.util.Optional;
 
 import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.streaming.reader.ContinuousReader;
 import org.apache.spark.sql.types.StructType;
 
@@ -44,5 +44,5 @@ public interface ContinuousReadSupport extends DataSourceV2 {
   ContinuousReader createContinuousReader(
     Optional<StructType> schema,
     String checkpointLocation,
-    DataSourceV2Options options);
+    DataSourceOptions options);
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java
index 3b357c0..22660e4 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java
@@ -20,8 +20,8 @@ package org.apache.spark.sql.sources.v2.streaming;
 import java.util.Optional;
 
 import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.DataSourceV2Options;
 import org.apache.spark.sql.sources.v2.streaming.reader.MicroBatchReader;
 import org.apache.spark.sql.types.StructType;
 
@@ -50,5 +50,5 @@ public interface MicroBatchReadSupport extends DataSourceV2 {
   MicroBatchReader createMicroBatchReader(
       Optional<StructType> schema,
       String checkpointLocation,
-      DataSourceV2Options options);
+      DataSourceOptions options);
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java
index 6cd219c..7c5f304 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java
@@ -19,10 +19,10 @@ package org.apache.spark.sql.sources.v2.streaming;
 
 import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.DataSourceV2Options;
 import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter;
-import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
 import org.apache.spark.sql.streaming.OutputMode;
 import org.apache.spark.sql.types.StructType;
 
@@ -31,7 +31,7 @@ import org.apache.spark.sql.types.StructType;
  * provide data writing ability for structured streaming.
  */
 @InterfaceStability.Evolving
-public interface StreamWriteSupport extends BaseStreamingSink {
+public interface StreamWriteSupport extends DataSourceV2, BaseStreamingSink {
 
     /**
      * Creates an optional {@link StreamWriter} to save the data to this data source. Data
@@ -39,7 +39,7 @@ public interface StreamWriteSupport extends BaseStreamingSink {
      *
      * @param queryId A unique string for the writing query. It's possible that there are many
      *                writing queries running at the same time, and the returned
-     *                {@link DataSourceV2Writer} can use this id to distinguish itself from others.
+     *                {@link DataSourceWriter} can use this id to distinguish itself from others.
      * @param schema the schema of the data to be written.
      * @param mode the output mode which determines what successive epoch output means to this
      *             sink, please refer to {@link OutputMode} for more details.
@@ -50,5 +50,5 @@ public interface StreamWriteSupport extends BaseStreamingSink {
         String queryId,
         StructType schema,
         OutputMode mode,
-        DataSourceV2Options options);
+        DataSourceOptions options);
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
index 3ac979c..6e5177e 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
@@ -19,12 +19,12 @@ package org.apache.spark.sql.sources.v2.streaming.reader;
 
 import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
-import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 
 import java.util.Optional;
 
 /**
- * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this
+ * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this
  * interface to allow reading in a continuous processing mode stream.
  *
  * Implementations must ensure each reader factory output is a {@link ContinuousDataReader}.
@@ -33,7 +33,7 @@ import java.util.Optional;
  * DataSource V1 APIs. This extension will be removed once we get rid of V1 completely.
  */
 @InterfaceStability.Evolving
-public interface ContinuousReader extends BaseStreamingSource, DataSourceV2Reader {
+public interface ContinuousReader extends BaseStreamingSource, DataSourceReader {
     /**
      * Merge partitioned offsets coming from {@link ContinuousDataReader} instances for each
      * partition to a single global offset.

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
index 68887e5..fcec446 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
@@ -18,20 +18,20 @@
 package org.apache.spark.sql.sources.v2.streaming.reader;
 
 import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
 
 import java.util.Optional;
 
 /**
- * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this
+ * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this
  * interface to indicate they allow micro-batch streaming reads.
  *
  * Note: This class currently extends {@link BaseStreamingSource} to maintain compatibility with
  * DataSource V1 APIs. This extension will be removed once we get rid of V1 completely.
  */
 @InterfaceStability.Evolving
-public interface MicroBatchReader extends DataSourceV2Reader, BaseStreamingSource {
+public interface MicroBatchReader extends DataSourceReader, BaseStreamingSource {
     /**
      * Set the desired offset range for reader factories created from this reader. Reader factories
      * will generate only data within (`start`, `end`]; that is, from the first record after `start`

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
index 3156c88..915ee6c 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
@@ -18,19 +18,19 @@
 package org.apache.spark.sql.sources.v2.streaming.writer;
 
 import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
 import org.apache.spark.sql.sources.v2.writer.DataWriter;
 import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
 
 /**
- * A {@link DataSourceV2Writer} for use with structured streaming. This writer handles commits and
+ * A {@link DataSourceWriter} for use with structured streaming. This writer handles commits and
  * aborts relative to an epoch ID determined by the execution engine.
  *
  * {@link DataWriter} implementations generated by a StreamWriter may be reused for multiple epochs,
  * and so must reset any internal state after a successful commit.
  */
 @InterfaceStability.Evolving
-public interface StreamWriter extends DataSourceV2Writer {
+public interface StreamWriter extends DataSourceWriter {
   /**
    * Commits this writing job for the specified epoch with a list of commit messages. The commit
    * messages are collected from successful data writers and are produced by

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
deleted file mode 100644
index 8048f50..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.writer;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.sources.v2.DataSourceV2Options;
-import org.apache.spark.sql.sources.v2.WriteSupport;
-import org.apache.spark.sql.streaming.OutputMode;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A data source writer that is returned by
- * {@link WriteSupport#createWriter(String, StructType, SaveMode, DataSourceV2Options)}/
- * {@link org.apache.spark.sql.sources.v2.streaming.StreamWriteSupport#createStreamWriter(
- * String, StructType, OutputMode, DataSourceV2Options)}.
- * It can mix in various writing optimization interfaces to speed up the data saving. The actual
- * writing logic is delegated to {@link DataWriter}.
- *
- * If an exception was throw when applying any of these writing optimizations, the action would fail
- * and no Spark job was submitted.
- *
- * The writing procedure is:
- *   1. Create a writer factory by {@link #createWriterFactory()}, serialize and send it to all the
- *      partitions of the input data(RDD).
- *   2. For each partition, create the data writer, and write the data of the partition with this
- *      writer. If all the data are written successfully, call {@link DataWriter#commit()}. If
- *      exception happens during the writing, call {@link DataWriter#abort()}.
- *   3. If all writers are successfully committed, call {@link #commit(WriterCommitMessage[])}. If
- *      some writers are aborted, or the job failed with an unknown reason, call
- *      {@link #abort(WriterCommitMessage[])}.
- *
- * While Spark will retry failed writing tasks, Spark won't retry failed writing jobs. Users should
- * do it manually in their Spark applications if they want to retry.
- *
- * Please refer to the documentation of commit/abort methods for detailed specifications.
- */
-@InterfaceStability.Evolving
-public interface DataSourceV2Writer {
-
-  /**
-   * Creates a writer factory which will be serialized and sent to executors.
-   *
-   * If this method fails (by throwing an exception), the action would fail and no Spark job was
-   * submitted.
-   */
-  DataWriterFactory<Row> createWriterFactory();
-
-  /**
-   * Commits this writing job with a list of commit messages. The commit messages are collected from
-   * successful data writers and are produced by {@link DataWriter#commit()}.
-   *
-   * If this method fails (by throwing an exception), this writing job is considered to to have been
-   * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination
-   * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it.
-   *
-   * Note that, one partition may have multiple committed data writers because of speculative tasks.
-   * Spark will pick the first successful one and get its commit message. Implementations should be
-   * aware of this and handle it correctly, e.g., have a coordinator to make sure only one data
-   * writer can commit, or have a way to clean up the data of already-committed writers.
-   */
-  void commit(WriterCommitMessage[] messages);
-
-  /**
-   * Aborts this writing job because some data writers are failed and keep failing when retry, or
-   * the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} fails.
-   *
-   * If this method fails (by throwing an exception), the underlying data source may require manual
-   * cleanup.
-   *
-   * Unless the abort is triggered by the failure of commit, the given messages should have some
-   * null slots as there maybe only a few data writers that are committed before the abort
-   * happens, or some data writers were committed but their commit messages haven't reached the
-   * driver when the abort is triggered. So this is just a "best effort" for data sources to
-   * clean up the data left by data writers.
-   */
-  void abort(WriterCommitMessage[] messages);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
new file mode 100644
index 0000000..d89d27d
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A data source writer that is returned by
+ * {@link WriteSupport#createWriter(String, StructType, SaveMode, DataSourceOptions)}/
+ * {@link org.apache.spark.sql.sources.v2.streaming.StreamWriteSupport#createStreamWriter(
+ * String, StructType, OutputMode, DataSourceOptions)}.
+ * It can mix in various writing optimization interfaces to speed up the data saving. The actual
+ * writing logic is delegated to {@link DataWriter}.
+ *
+ * If an exception was throw when applying any of these writing optimizations, the action would fail
+ * and no Spark job was submitted.
+ *
+ * The writing procedure is:
+ *   1. Create a writer factory by {@link #createWriterFactory()}, serialize and send it to all the
+ *      partitions of the input data(RDD).
+ *   2. For each partition, create the data writer, and write the data of the partition with this
+ *      writer. If all the data are written successfully, call {@link DataWriter#commit()}. If
+ *      exception happens during the writing, call {@link DataWriter#abort()}.
+ *   3. If all writers are successfully committed, call {@link #commit(WriterCommitMessage[])}. If
+ *      some writers are aborted, or the job failed with an unknown reason, call
+ *      {@link #abort(WriterCommitMessage[])}.
+ *
+ * While Spark will retry failed writing tasks, Spark won't retry failed writing jobs. Users should
+ * do it manually in their Spark applications if they want to retry.
+ *
+ * Please refer to the documentation of commit/abort methods for detailed specifications.
+ */
+@InterfaceStability.Evolving
+public interface DataSourceWriter {
+
+  /**
+   * Creates a writer factory which will be serialized and sent to executors.
+   *
+   * If this method fails (by throwing an exception), the action would fail and no Spark job was
+   * submitted.
+   */
+  DataWriterFactory<Row> createWriterFactory();
+
+  /**
+   * Commits this writing job with a list of commit messages. The commit messages are collected from
+   * successful data writers and are produced by {@link DataWriter#commit()}.
+   *
+   * If this method fails (by throwing an exception), this writing job is considered to to have been
+   * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination
+   * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it.
+   *
+   * Note that, one partition may have multiple committed data writers because of speculative tasks.
+   * Spark will pick the first successful one and get its commit message. Implementations should be
+   * aware of this and handle it correctly, e.g., have a coordinator to make sure only one data
+   * writer can commit, or have a way to clean up the data of already-committed writers.
+   */
+  void commit(WriterCommitMessage[] messages);
+
+  /**
+   * Aborts this writing job because some data writers are failed and keep failing when retry, or
+   * the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} fails.
+   *
+   * If this method fails (by throwing an exception), the underlying data source may require manual
+   * cleanup.
+   *
+   * Unless the abort is triggered by the failure of commit, the given messages should have some
+   * null slots as there maybe only a few data writers that are committed before the abort
+   * happens, or some data writers were committed but their commit messages haven't reached the
+   * driver when the abort is triggered. So this is just a "best effort" for data sources to
+   * clean up the data left by data writers.
+   */
+  void abort(WriterCommitMessage[] messages);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
index 04b03e6..53941a8 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
@@ -33,11 +33,11 @@ import org.apache.spark.annotation.InterfaceStability;
  *
  * If this data writer succeeds(all records are successfully written and {@link #commit()}
  * succeeds), a {@link WriterCommitMessage} will be sent to the driver side and pass to
- * {@link DataSourceV2Writer#commit(WriterCommitMessage[])} with commit messages from other data
+ * {@link DataSourceWriter#commit(WriterCommitMessage[])} with commit messages from other data
  * writers. If this data writer fails(one record fails to write or {@link #commit()} fails), an
  * exception will be sent to the driver side, and Spark will retry this writing task for some times,
  * each time {@link DataWriterFactory#createDataWriter(int, int)} gets a different `attemptNumber`,
- * and finally call {@link DataSourceV2Writer#abort(WriterCommitMessage[])} if all retry fail.
+ * and finally call {@link DataSourceWriter#abort(WriterCommitMessage[])} if all retry fail.
  *
  * Besides the retry mechanism, Spark may launch speculative tasks if the existing writing task
  * takes too long to finish. Different from retried tasks, which are launched one by one after the
@@ -69,11 +69,11 @@ public interface DataWriter<T> {
   /**
    * Commits this writer after all records are written successfully, returns a commit message which
    * will be sent back to driver side and passed to
-   * {@link DataSourceV2Writer#commit(WriterCommitMessage[])}.
+   * {@link DataSourceWriter#commit(WriterCommitMessage[])}.
    *
    * The written data should only be visible to data source readers after
-   * {@link DataSourceV2Writer#commit(WriterCommitMessage[])} succeeds, which means this method
-   * should still "hide" the written data and ask the {@link DataSourceV2Writer} at driver side to
+   * {@link DataSourceWriter#commit(WriterCommitMessage[])} succeeds, which means this method
+   * should still "hide" the written data and ask the {@link DataSourceWriter} at driver side to
    * do the final commit via {@link WriterCommitMessage}.
    *
    * If this method fails (by throwing an exception), {@link #abort()} will be called and this
@@ -91,7 +91,7 @@ public interface DataWriter<T> {
    * failed.
    *
    * If this method fails(by throwing an exception), the underlying data source may have garbage
-   * that need to be cleaned by {@link DataSourceV2Writer#abort(WriterCommitMessage[])} or manually,
+   * that need to be cleaned by {@link DataSourceWriter#abort(WriterCommitMessage[])} or manually,
    * but these garbage should not be visible to data source readers.
    *
    * @throws IOException if failure happens during disk/network IO like writing files.

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
index 18ec792..ea95442 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
@@ -22,7 +22,7 @@ import java.io.Serializable;
 import org.apache.spark.annotation.InterfaceStability;
 
 /**
- * A factory of {@link DataWriter} returned by {@link DataSourceV2Writer#createWriterFactory()},
+ * A factory of {@link DataWriter} returned by {@link DataSourceWriter#createWriterFactory()},
  * which is responsible for creating and initializing the actual data writer at executor side.
  *
  * Note that, the writer factory will be serialized and sent to executors, then the data writer

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsWriteInternalRow.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsWriteInternalRow.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsWriteInternalRow.java
index 3e05188..d2cf7e0 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsWriteInternalRow.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsWriteInternalRow.java
@@ -22,14 +22,14 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 
 /**
- * A mix-in interface for {@link DataSourceV2Writer}. Data source writers can implement this
+ * A mix-in interface for {@link DataSourceWriter}. Data source writers can implement this
  * interface to write {@link InternalRow} directly and avoid the row conversion at Spark side.
  * This is an experimental and unstable interface, as {@link InternalRow} is not public and may get
  * changed in the future Spark versions.
  */
 
 @InterfaceStability.Unstable
-public interface SupportsWriteInternalRow extends DataSourceV2Writer {
+public interface SupportsWriteInternalRow extends DataSourceWriter {
 
   @Override
   default DataWriterFactory<Row> createWriterFactory() {

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java
index 082d6b5..9e38836 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java
@@ -23,10 +23,10 @@ import org.apache.spark.annotation.InterfaceStability;
 
 /**
  * A commit message returned by {@link DataWriter#commit()} and will be sent back to the driver side
- * as the input parameter of {@link DataSourceV2Writer#commit(WriterCommitMessage[])}.
+ * as the input parameter of {@link DataSourceWriter#commit(WriterCommitMessage[])}.
  *
  * This is an empty interface, data sources should define their own message class and use it in
- * their {@link DataWriter#commit()} and {@link DataSourceV2Writer#commit(WriterCommitMessage[])}
+ * their {@link DataWriter#commit()} and {@link DataSourceWriter#commit(WriterCommitMessage[])}
  * implementations.
  */
 @InterfaceStability.Evolving

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index b714a46..46b5f54 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -186,7 +186,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
     val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
     if (classOf[DataSourceV2].isAssignableFrom(cls)) {
       val ds = cls.newInstance()
-      val options = new DataSourceV2Options((extraOptions ++
+      val options = new DataSourceOptions((extraOptions ++
         DataSourceV2Utils.extractSessionConfigs(
           ds = ds.asInstanceOf[DataSourceV2],
           conf = sparkSession.sessionState.conf)).asJava)

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 5c02eae..ed7a910 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -243,7 +243,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
       val ds = cls.newInstance()
       ds match {
         case ws: WriteSupport =>
-          val options = new DataSourceV2Options((extraOptions ++
+          val options = new DataSourceOptions((extraOptions ++
             DataSourceV2Utils.extractSessionConfigs(
               ds = ds.asInstanceOf[DataSourceV2],
               conf = df.sparkSession.sessionState.conf)).asJava)

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala
index 6093df2..6460c97 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala
@@ -35,7 +35,7 @@ trait DataSourceReaderHolder {
   /**
    * The held data source reader.
    */
-  def reader: DataSourceV2Reader
+  def reader: DataSourceReader
 
   /**
    * The metadata of this data source reader that can be used for equality test.

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index cba20dd..3d4c649 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.sources.v2.reader._
 
 case class DataSourceV2Relation(
     fullOutput: Seq[AttributeReference],
-    reader: DataSourceV2Reader) extends LeafNode with DataSourceReaderHolder {
+    reader: DataSourceReader) extends LeafNode with DataSourceReaderHolder {
 
   override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2Relation]
 
@@ -41,12 +41,12 @@ case class DataSourceV2Relation(
  */
 class StreamingDataSourceV2Relation(
     fullOutput: Seq[AttributeReference],
-    reader: DataSourceV2Reader) extends DataSourceV2Relation(fullOutput, reader) {
+    reader: DataSourceReader) extends DataSourceV2Relation(fullOutput, reader) {
   override def isStreaming: Boolean = true
 }
 
 object DataSourceV2Relation {
-  def apply(reader: DataSourceV2Reader): DataSourceV2Relation = {
+  def apply(reader: DataSourceReader): DataSourceV2Relation = {
     new DataSourceV2Relation(reader.readSchema().toAttributes, reader)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
index 3f808fb..ee08582 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.types.StructType
  */
 case class DataSourceV2ScanExec(
     fullOutput: Seq[AttributeReference],
-    @transient reader: DataSourceV2Reader)
+    @transient reader: DataSourceReader)
   extends LeafExecNode with DataSourceReaderHolder with ColumnarBatchScan {
 
   override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2ScanExec]

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
index cd6b3e9..c544adb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
@@ -35,7 +35,7 @@ import org.apache.spark.util.Utils
 /**
  * The logical plan for writing data into data source v2.
  */
-case class WriteToDataSourceV2(writer: DataSourceV2Writer, query: LogicalPlan) extends LogicalPlan {
+case class WriteToDataSourceV2(writer: DataSourceWriter, query: LogicalPlan) extends LogicalPlan {
   override def children: Seq[LogicalPlan] = Seq(query)
   override def output: Seq[Attribute] = Nil
 }
@@ -43,7 +43,7 @@ case class WriteToDataSourceV2(writer: DataSourceV2Writer, query: LogicalPlan) e
 /**
  * The physical plan for writing data into data source v2.
  */
-case class WriteToDataSourceV2Exec(writer: DataSourceV2Writer, query: SparkPlan) extends SparkPlan {
+case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) extends SparkPlan {
   override def children: Seq[SparkPlan] = Seq(query)
   override def output: Seq[Attribute] = Nil
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 9759752..93572f7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2}
 import org.apache.spark.sql.execution.streaming.sources.{InternalRowMicroBatchWriter, MicroBatchWriter}
-import org.apache.spark.sql.sources.v2.DataSourceV2Options
+import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.sources.v2.streaming.{MicroBatchReadSupport, StreamWriteSupport}
 import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset => OffsetV2}
 import org.apache.spark.sql.sources.v2.writer.SupportsWriteInternalRow
@@ -89,7 +89,7 @@ class MicroBatchExecution(
           val reader = source.createMicroBatchReader(
             Optional.empty(), // user specified schema
             metadataPath,
-            new DataSourceV2Options(options.asJava))
+            new DataSourceOptions(options.asJava))
           nextSourceId += 1
           StreamingExecutionRelation(reader, output)(sparkSession)
         })
@@ -447,7 +447,7 @@ class MicroBatchExecution(
           s"$runId",
           newAttributePlan.schema,
           outputMode,
-          new DataSourceV2Options(extraOptions.asJava))
+          new DataSourceOptions(extraOptions.asJava))
         if (writer.isInstanceOf[SupportsWriteInternalRow]) {
           WriteToDataSourceV2(
             new InternalRowMicroBatchWriter(currentBatchId, writer), newAttributePlan)

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
index 66eb016..5e3fee6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
@@ -111,7 +111,7 @@ class RateSourceProvider extends StreamSourceProvider with DataSourceRegister
   override def createContinuousReader(
       schema: Optional[StructType],
       checkpointLocation: String,
-      options: DataSourceV2Options): ContinuousReader = {
+      options: DataSourceOptions): ContinuousReader = {
     new RateStreamContinuousReader(options)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
index d5ac0bd..3f5bb48 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming
 import org.apache.spark.sql._
 import org.apache.spark.sql.execution.streaming.sources.ConsoleWriter
 import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister}
-import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2}
 import org.apache.spark.sql.sources.v2.streaming.StreamWriteSupport
 import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
 import org.apache.spark.sql.streaming.OutputMode
@@ -40,7 +40,7 @@ class ConsoleSinkProvider extends DataSourceV2
       queryId: String,
       schema: StructType,
       mode: OutputMode,
-      options: DataSourceV2Options): StreamWriter = {
+      options: DataSourceOptions): StreamWriter = {
     new ConsoleWriter(schema, options)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 60f880f..9402d7c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, StreamingDataSourceV2Relation, WriteToDataSourceV2}
 import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
-import org.apache.spark.sql.sources.v2.DataSourceV2Options
+import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, StreamWriteSupport}
 import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousReader, PartitionOffset}
 import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
@@ -160,7 +160,7 @@ class ContinuousExecution(
         dataSource.createContinuousReader(
           java.util.Optional.empty[StructType](),
           metadataPath,
-          new DataSourceV2Options(extraReaderOptions.asJava))
+          new DataSourceOptions(extraReaderOptions.asJava))
     }
     uniqueSources = continuousSources.distinct
 
@@ -198,7 +198,7 @@ class ContinuousExecution(
       s"$runId",
       triggerLogicalPlan.schema,
       outputMode,
-      new DataSourceV2Options(extraOptions.asJava))
+      new DataSourceOptions(extraOptions.asJava))
     val withSink = WriteToDataSourceV2(writer, triggerLogicalPlan)
 
     val reader = withSink.collect {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message