spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-23219][SQL] Rename ReadTask to DataReaderFactory in data source v2
Date Mon, 29 Jan 2018 16:51:27 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 8229e155d -> de66abafc


[SPARK-23219][SQL] Rename ReadTask to DataReaderFactory in data source v2

## What changes were proposed in this pull request?

Currently we have `ReadTask` in data source v2 reader, while in writer we have `DataWriterFactory`.
To make the naming consistent and better, renaming `ReadTask` to `DataReaderFactory`.

## How was this patch tested?

Unit test

Author: Wang Gengliang <ltnwgl@gmail.com>

Closes #20397 from gengliangwang/rename.

(cherry picked from commit badf0d0e0d1d9aa169ed655176ce9ae684d3905d)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de66abaf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de66abaf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de66abaf

Branch: refs/heads/branch-2.3
Commit: de66abafcf081c5cc4d1556d21c8ec21e1fefdf5
Parents: 8229e15
Author: Wang Gengliang <ltnwgl@gmail.com>
Authored: Tue Jan 30 00:50:49 2018 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Tue Jan 30 00:51:14 2018 +0800

----------------------------------------------------------------------
 .../sql/kafka010/KafkaContinuousReader.scala    | 16 ++---
 .../v2/reader/ClusteredDistribution.java        |  2 +-
 .../spark/sql/sources/v2/reader/DataReader.java |  2 +-
 .../sources/v2/reader/DataReaderFactory.java    | 61 ++++++++++++++++++++
 .../sources/v2/reader/DataSourceV2Reader.java   | 11 ++--
 .../sql/sources/v2/reader/Distribution.java     |  6 +-
 .../sql/sources/v2/reader/Partitioning.java     |  2 +-
 .../spark/sql/sources/v2/reader/ReadTask.java   | 59 -------------------
 .../v2/reader/SupportsScanColumnarBatch.java    | 11 ++--
 .../v2/reader/SupportsScanUnsafeRow.java        |  9 +--
 .../v2/streaming/MicroBatchReadSupport.java     |  4 +-
 .../v2/streaming/reader/ContinuousReader.java   | 14 ++---
 .../v2/streaming/reader/MicroBatchReader.java   |  6 +-
 .../datasources/v2/DataSourceRDD.scala          | 14 ++---
 .../datasources/v2/DataSourceV2ScanExec.scala   | 25 ++++----
 .../ContinuousDataSourceRDDIter.scala           | 11 ++--
 .../continuous/ContinuousRateStreamSource.scala | 10 ++--
 .../streaming/sources/RateStreamSourceV2.scala  |  6 +-
 .../sources/v2/JavaAdvancedDataSourceV2.java    | 20 +++----
 .../sql/sources/v2/JavaBatchDataSourceV2.java   | 10 ++--
 .../v2/JavaPartitionAwareDataSource.java        | 10 ++--
 .../v2/JavaSchemaRequiredDataSource.java        |  4 +-
 .../sql/sources/v2/JavaSimpleDataSourceV2.java  | 14 ++---
 .../sources/v2/JavaUnsafeRowDataSourceV2.java   | 13 +++--
 .../execution/streaming/RateSourceV2Suite.scala | 10 ++--
 .../sql/sources/v2/DataSourceV2Suite.scala      | 59 ++++++++++---------
 .../sources/v2/SimpleWritableDataSource.scala   | 12 ++--
 .../sources/StreamingDataSourceV2Suite.scala    |  4 +-
 28 files changed, 221 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/de66abaf/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
index fc97797..9125cf5 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
@@ -63,7 +63,7 @@ class KafkaContinuousReader(
 
   private val pollTimeoutMs = sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong
 
-  // Initialized when creating read tasks. If this diverges from the partitions at the latest
+  // Initialized when creating reader factories. If this diverges from the partitions at the latest
   // offsets, we need to reconfigure.
   // Exposed outside this object only for unit tests.
   private[sql] var knownPartitions: Set[TopicPartition] = _
@@ -89,7 +89,7 @@ class KafkaContinuousReader(
     KafkaSourceOffset(JsonUtils.partitionOffsets(json))
   }
 
-  override def createUnsafeRowReadTasks(): ju.List[ReadTask[UnsafeRow]] = {
+  override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = {
     import scala.collection.JavaConverters._
 
     val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(offset)
@@ -109,9 +109,9 @@ class KafkaContinuousReader(
 
     startOffsets.toSeq.map {
       case (topicPartition, start) =>
-        KafkaContinuousReadTask(
+        KafkaContinuousDataReaderFactory(
           topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss)
-          .asInstanceOf[ReadTask[UnsafeRow]]
+          .asInstanceOf[DataReaderFactory[UnsafeRow]]
     }.asJava
   }
 
@@ -149,8 +149,8 @@ class KafkaContinuousReader(
 }
 
 /**
- * A read task for continuous Kafka processing. This will be serialized and transformed into a
- * full reader on executors.
+ * A data reader factory for continuous Kafka processing. This will be serialized and transformed
+ * into a full reader on executors.
  *
  * @param topicPartition The (topic, partition) pair this task is responsible for.
  * @param startOffset The offset to start reading from within the partition.
@@ -159,12 +159,12 @@ class KafkaContinuousReader(
  * @param failOnDataLoss Flag indicating whether data reader should fail if some offsets
  *                       are skipped.
  */
-case class KafkaContinuousReadTask(
+case class KafkaContinuousDataReaderFactory(
     topicPartition: TopicPartition,
     startOffset: Long,
     kafkaParams: ju.Map[String, Object],
     pollTimeoutMs: Long,
-    failOnDataLoss: Boolean) extends ReadTask[UnsafeRow] {
+    failOnDataLoss: Boolean) extends DataReaderFactory[UnsafeRow] {
   override def createDataReader(): KafkaContinuousDataReader = {
     new KafkaContinuousDataReader(
       topicPartition, startOffset, kafkaParams, pollTimeoutMs, failOnDataLoss)

http://git-wip-us.apache.org/repos/asf/spark/blob/de66abaf/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteredDistribution.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteredDistribution.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteredDistribution.java
index 7346500..27905e3 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteredDistribution.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteredDistribution.java
@@ -22,7 +22,7 @@ import org.apache.spark.annotation.InterfaceStability;
 /**
  * A concrete implementation of {@link Distribution}. Represents a distribution where records that
  * share the same values for the {@link #clusteredColumns} will be produced by the same
- * {@link ReadTask}.
+ * {@link DataReader}.
  */
 @InterfaceStability.Evolving
 public class ClusteredDistribution implements Distribution {

http://git-wip-us.apache.org/repos/asf/spark/blob/de66abaf/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java
index 8f58c86..bb9790a 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 import org.apache.spark.annotation.InterfaceStability;
 
 /**
- * A data reader returned by {@link ReadTask#createDataReader()} and is responsible for
+ * A data reader returned by {@link DataReaderFactory#createDataReader()} and is responsible for
  * outputting data for a RDD partition.
  *
  * Note that, Currently the type `T` can only be {@link org.apache.spark.sql.Row} for normal data

http://git-wip-us.apache.org/repos/asf/spark/blob/de66abaf/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReaderFactory.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReaderFactory.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReaderFactory.java
new file mode 100644
index 0000000..077b95b
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReaderFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.io.Serializable;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * A reader factory returned by {@link DataSourceV2Reader#createDataReaderFactories()} and is
+ * responsible for creating the actual data reader. The relationship between
+ * {@link DataReaderFactory} and {@link DataReader}
+ * is similar to the relationship between {@link Iterable} and {@link java.util.Iterator}.
+ *
+ * Note that, the reader factory will be serialized and sent to executors, then the data reader
+ * will be created on executors and do the actual reading. So {@link DataReaderFactory} must be
+ * serializable and {@link DataReader} doesn't need to be.
+ */
+@InterfaceStability.Evolving
+public interface DataReaderFactory<T> extends Serializable {
+
+  /**
+   * The preferred locations where the data reader returned by this reader factory can run faster,
+   * but Spark does not guarantee to run the data reader on these locations.
+   * The implementations should make sure that it can be run on any location.
+   * The location is a string representing the host name.
+   *
+   * Note that if a host name cannot be recognized by Spark, it will be ignored as it was not in
+   * the returned locations. By default this method returns empty string array, which means this
+   * task has no location preference.
+   *
+   * If this method fails (by throwing an exception), the action would fail and no Spark job was
+   * submitted.
+   */
+  default String[] preferredLocations() {
+    return new String[0];
+  }
+
+  /**
+   * Returns a data reader to do the actual reading work.
+   *
+   * If this method fails (by throwing an exception), the corresponding Spark task would fail and
+   * get retried until hitting the maximum retry times.
+   */
+  DataReader<T> createDataReader();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/de66abaf/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java
index f23c384..0180cd9 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java
@@ -30,7 +30,8 @@ import org.apache.spark.sql.types.StructType;
  * {@link org.apache.spark.sql.sources.v2.ReadSupportWithSchema#createReader(
  * StructType, org.apache.spark.sql.sources.v2.DataSourceV2Options)}.
  * It can mix in various query optimization interfaces to speed up the data scan. The actual scan
- * logic is delegated to {@link ReadTask}s that are returned by {@link #createReadTasks()}.
+ * logic is delegated to {@link DataReaderFactory}s that are returned by
+ * {@link #createDataReaderFactories()}.
  *
  * There are mainly 3 kinds of query optimizations:
  *   1. Operators push-down. E.g., filter push-down, required columns push-down(aka column
@@ -63,9 +64,9 @@ public interface DataSourceV2Reader {
   StructType readSchema();
 
   /**
-   * Returns a list of read tasks. Each task is responsible for outputting data for one RDD
-   * partition. That means the number of tasks returned here is same as the number of RDD
-   * partitions this scan outputs.
+   * Returns a list of reader factories. Each factory is responsible for creating a data reader to
+   * output data for one RDD partition. That means the number of factories returned here is same as
+   * the number of RDD partitions this scan outputs.
    *
    * Note that, this may not be a full scan if the data source reader mixes in other optimization
    * interfaces like column pruning, filter push-down, etc. These optimizations are applied before
@@ -74,5 +75,5 @@ public interface DataSourceV2Reader {
    * If this method fails (by throwing an exception), the action would fail and no Spark job was
    * submitted.
    */
-  List<ReadTask<Row>> createReadTasks();
+  List<DataReaderFactory<Row>> createDataReaderFactories();
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/de66abaf/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Distribution.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Distribution.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Distribution.java
index a6201a2..b375621 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Distribution.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Distribution.java
@@ -21,9 +21,9 @@ import org.apache.spark.annotation.InterfaceStability;
 
 /**
  * An interface to represent data distribution requirement, which specifies how the records should
- * be distributed among the {@link ReadTask}s that are returned by
- * {@link DataSourceV2Reader#createReadTasks()}. Note that this interface has nothing to do with
- * the data ordering inside one partition(the output records of a single {@link ReadTask}).
+ * be distributed among the data partitions(one {@link DataReader} outputs data for one partition).
+ * Note that this interface has nothing to do with the data ordering inside one
+ * partition(the output records of a single {@link DataReader}).
  *
  * The instance of this interface is created and provided by Spark, then consumed by
  * {@link Partitioning#satisfy(Distribution)}. This means data source developers don't need to

http://git-wip-us.apache.org/repos/asf/spark/blob/de66abaf/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java
index 199e45d..5e334d1 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java
@@ -29,7 +29,7 @@ import org.apache.spark.annotation.InterfaceStability;
 public interface Partitioning {
 
   /**
-   * Returns the number of partitions(i.e., {@link ReadTask}s) the data source outputs.
+   * Returns the number of partitions(i.e., {@link DataReaderFactory}s) the data source outputs.
    */
   int numPartitions();
 

http://git-wip-us.apache.org/repos/asf/spark/blob/de66abaf/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java
deleted file mode 100644
index fa161cd..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import java.io.Serializable;
-
-import org.apache.spark.annotation.InterfaceStability;
-
-/**
- * A read task returned by {@link DataSourceV2Reader#createReadTasks()} and is responsible for
- * creating the actual data reader. The relationship between {@link ReadTask} and {@link DataReader}
- * is similar to the relationship between {@link Iterable} and {@link java.util.Iterator}.
- *
- * Note that, the read task will be serialized and sent to executors, then the data reader will be
- * created on executors and do the actual reading. So {@link ReadTask} must be serializable and
- * {@link DataReader} doesn't need to be.
- */
-@InterfaceStability.Evolving
-public interface ReadTask<T> extends Serializable {
-
-  /**
-   * The preferred locations where this read task can run faster, but Spark does not guarantee that
-   * this task will always run on these locations. The implementations should make sure that it can
-   * be run on any location. The location is a string representing the host name.
-   *
-   * Note that if a host name cannot be recognized by Spark, it will be ignored as it was not in
-   * the returned locations. By default this method returns empty string array, which means this
-   * task has no location preference.
-   *
-   * If this method fails (by throwing an exception), the action would fail and no Spark job was
-   * submitted.
-   */
-  default String[] preferredLocations() {
-    return new String[0];
-  }
-
-  /**
-   * Returns a data reader to do the actual reading work for this read task.
-   *
-   * If this method fails (by throwing an exception), the corresponding Spark task would fail and
-   * get retried until hitting the maximum retry times.
-   */
-  DataReader<T> createDataReader();
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/de66abaf/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
index 27cf3a7..67da555 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
@@ -30,21 +30,22 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
 @InterfaceStability.Evolving
 public interface SupportsScanColumnarBatch extends DataSourceV2Reader {
   @Override
-  default List<ReadTask<Row>> createReadTasks() {
+  default List<DataReaderFactory<Row>> createDataReaderFactories() {
     throw new IllegalStateException(
-      "createReadTasks not supported by default within SupportsScanColumnarBatch.");
+      "createDataReaderFactories not supported by default within SupportsScanColumnarBatch.");
   }
 
   /**
-   * Similar to {@link DataSourceV2Reader#createReadTasks()}, but returns columnar data in batches.
+   * Similar to {@link DataSourceV2Reader#createDataReaderFactories()}, but returns columnar data
+   * in batches.
    */
-  List<ReadTask<ColumnarBatch>> createBatchReadTasks();
+  List<DataReaderFactory<ColumnarBatch>> createBatchDataReaderFactories();
 
   /**
    * Returns true if the concrete data source reader can read data in batch according to the scan
    * properties like required columns, pushes filters, etc. It's possible that the implementation
    * can only support some certain columns with certain types. Users can overwrite this method and
-   * {@link #createReadTasks()} to fallback to normal read path under some conditions.
+   * {@link #createDataReaderFactories()} to fallback to normal read path under some conditions.
    */
   default boolean enableBatchRead() {
     return true;

http://git-wip-us.apache.org/repos/asf/spark/blob/de66abaf/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java
index 2d3ad0e..156af69 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java
@@ -33,13 +33,14 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
 public interface SupportsScanUnsafeRow extends DataSourceV2Reader {
 
   @Override
-  default List<ReadTask<Row>> createReadTasks() {
+  default List<DataReaderFactory<Row>> createDataReaderFactories() {
     throw new IllegalStateException(
-      "createReadTasks not supported by default within SupportsScanUnsafeRow");
+      "createDataReaderFactories not supported by default within SupportsScanUnsafeRow");
   }
 
   /**
-   * Similar to {@link DataSourceV2Reader#createReadTasks()}, but returns data in unsafe row format.
+   * Similar to {@link DataSourceV2Reader#createDataReaderFactories()},
+   * but returns data in unsafe row format.
    */
-  List<ReadTask<UnsafeRow>> createUnsafeRowReadTasks();
+  List<DataReaderFactory<UnsafeRow>> createUnsafeRowReaderFactories();
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/de66abaf/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java
index 3c87a3d..3b357c0 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java
@@ -36,8 +36,8 @@ public interface MicroBatchReadSupport extends DataSourceV2 {
    * streaming query.
    *
    * The execution engine will create a micro-batch reader at the start of a streaming query,
-   * alternate calls to setOffsetRange and createReadTasks for each batch to process, and then
-   * call stop() when the execution is complete. Note that a single query may have multiple
+   * alternate calls to setOffsetRange and createDataReaderFactories for each batch to process, and
+   * then call stop() when the execution is complete. Note that a single query may have multiple
    * executions due to restart or failure recovery.
    *
    * @param schema the user provided schema, or empty() if none was provided

http://git-wip-us.apache.org/repos/asf/spark/blob/de66abaf/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
index 745f1ce..3ac979c 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
@@ -27,7 +27,7 @@ import java.util.Optional;
  * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this
  * interface to allow reading in a continuous processing mode stream.
  *
- * Implementations must ensure each read task output is a {@link ContinuousDataReader}.
+ * Implementations must ensure each reader factory output is a {@link ContinuousDataReader}.
  *
  * Note: This class currently extends {@link BaseStreamingSource} to maintain compatibility with
  * DataSource V1 APIs. This extension will be removed once we get rid of V1 completely.
@@ -47,9 +47,9 @@ public interface ContinuousReader extends BaseStreamingSource, DataSourceV2Reade
     Offset deserializeOffset(String json);
 
     /**
-     * Set the desired start offset for read tasks created from this reader. The scan will start
-     * from the first record after the provided offset, or from an implementation-defined inferred
-     * starting point if no offset is provided.
+     * Set the desired start offset for reader factories created from this reader. The scan will
+     * start from the first record after the provided offset, or from an implementation-defined
+     * inferred starting point if no offset is provided.
      */
     void setOffset(Optional<Offset> start);
 
@@ -61,9 +61,9 @@ public interface ContinuousReader extends BaseStreamingSource, DataSourceV2Reade
     Offset getStartOffset();
 
     /**
-     * The execution engine will call this method in every epoch to determine if new read tasks need
-     * to be generated, which may be required if for example the underlying source system has had
-     * partitions added or removed.
+     * The execution engine will call this method in every epoch to determine if new reader
+     * factories need to be generated, which may be required if for example the underlying
+     * source system has had partitions added or removed.
      *
      * If true, the query will be shut down and restarted with a new reader.
      */

http://git-wip-us.apache.org/repos/asf/spark/blob/de66abaf/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
index 02f37ce..68887e5 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
@@ -33,9 +33,9 @@ import java.util.Optional;
 @InterfaceStability.Evolving
 public interface MicroBatchReader extends DataSourceV2Reader, BaseStreamingSource {
     /**
-     * Set the desired offset range for read tasks created from this reader. Read tasks will
-     * generate only data within (`start`, `end`]; that is, from the first record after `start` to
-     * the record with offset `end`.
+     * Set the desired offset range for reader factories created from this reader. Reader factories
+     * will generate only data within (`start`, `end`]; that is, from the first record after `start`
+     * to the record with offset `end`.
      *
      * @param start The initial offset to scan from. If not specified, scan from an
      *              implementation-specified start point, such as the earliest available record.

http://git-wip-us.apache.org/repos/asf/spark/blob/de66abaf/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
index ac104d7..5ed0ba7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
@@ -22,24 +22,24 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.sources.v2.reader.ReadTask
+import org.apache.spark.sql.sources.v2.reader.DataReaderFactory
 
-class DataSourceRDDPartition[T : ClassTag](val index: Int, val readTask: ReadTask[T])
+class DataSourceRDDPartition[T : ClassTag](val index: Int, val readerFactory: DataReaderFactory[T])
   extends Partition with Serializable
 
 class DataSourceRDD[T: ClassTag](
     sc: SparkContext,
-    @transient private val readTasks: java.util.List[ReadTask[T]])
+    @transient private val readerFactories: java.util.List[DataReaderFactory[T]])
   extends RDD[T](sc, Nil) {
 
   override protected def getPartitions: Array[Partition] = {
-    readTasks.asScala.zipWithIndex.map {
-      case (readTask, index) => new DataSourceRDDPartition(index, readTask)
+    readerFactories.asScala.zipWithIndex.map {
+      case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory)
     }.toArray
   }
 
   override def compute(split: Partition, context: TaskContext): Iterator[T] = {
-    val reader = split.asInstanceOf[DataSourceRDDPartition[T]].readTask.createDataReader()
+    val reader = split.asInstanceOf[DataSourceRDDPartition[T]].readerFactory.createDataReader()
     context.addTaskCompletionListener(_ => reader.close())
     val iter = new Iterator[T] {
       private[this] var valuePrepared = false
@@ -63,6 +63,6 @@ class DataSourceRDD[T: ClassTag](
   }
 
   override def getPreferredLocations(split: Partition): Seq[String] = {
-    split.asInstanceOf[DataSourceRDDPartition[T]].readTask.preferredLocations()
+    split.asInstanceOf[DataSourceRDDPartition[T]].readerFactory.preferredLocations()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/de66abaf/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
index 2c22239..3f808fb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
@@ -51,11 +51,11 @@ case class DataSourceV2ScanExec(
     case _ => super.outputPartitioning
   }
 
-  private lazy val readTasks: java.util.List[ReadTask[UnsafeRow]] = reader match {
-    case r: SupportsScanUnsafeRow => r.createUnsafeRowReadTasks()
+  private lazy val readerFactories: java.util.List[DataReaderFactory[UnsafeRow]] = reader match {
+    case r: SupportsScanUnsafeRow => r.createUnsafeRowReaderFactories()
     case _ =>
-      reader.createReadTasks().asScala.map {
-        new RowToUnsafeRowReadTask(_, reader.readSchema()): ReadTask[UnsafeRow]
+      reader.createDataReaderFactories().asScala.map {
+        new RowToUnsafeRowDataReaderFactory(_, reader.readSchema()): DataReaderFactory[UnsafeRow]
       }.asJava
   }
 
@@ -63,18 +63,19 @@ case class DataSourceV2ScanExec(
     case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
       assert(!reader.isInstanceOf[ContinuousReader],
         "continuous stream reader does not support columnar read yet.")
-      new DataSourceRDD(sparkContext, r.createBatchReadTasks()).asInstanceOf[RDD[InternalRow]]
+      new DataSourceRDD(sparkContext, r.createBatchDataReaderFactories())
+        .asInstanceOf[RDD[InternalRow]]
 
     case _: ContinuousReader =>
       EpochCoordinatorRef.get(
           sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
           sparkContext.env)
-        .askSync[Unit](SetReaderPartitions(readTasks.size()))
-      new ContinuousDataSourceRDD(sparkContext, sqlContext, readTasks)
+        .askSync[Unit](SetReaderPartitions(readerFactories.size()))
+      new ContinuousDataSourceRDD(sparkContext, sqlContext, readerFactories)
         .asInstanceOf[RDD[InternalRow]]
 
     case _ =>
-      new DataSourceRDD(sparkContext, readTasks).asInstanceOf[RDD[InternalRow]]
+      new DataSourceRDD(sparkContext, readerFactories).asInstanceOf[RDD[InternalRow]]
   }
 
   override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD)
@@ -99,14 +100,14 @@ case class DataSourceV2ScanExec(
   }
 }
 
-class RowToUnsafeRowReadTask(rowReadTask: ReadTask[Row], schema: StructType)
-  extends ReadTask[UnsafeRow] {
+class RowToUnsafeRowDataReaderFactory(rowReaderFactory: DataReaderFactory[Row], schema: StructType)
+  extends DataReaderFactory[UnsafeRow] {
 
-  override def preferredLocations: Array[String] = rowReadTask.preferredLocations
+  override def preferredLocations: Array[String] = rowReaderFactory.preferredLocations
 
   override def createDataReader: DataReader[UnsafeRow] = {
     new RowToUnsafeDataReader(
-      rowReadTask.createDataReader, RowEncoder.apply(schema).resolveAndBind())
+      rowReaderFactory.createDataReader, RowEncoder.apply(schema).resolveAndBind())
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/de66abaf/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala
index cd7065f..8a7a38b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala
@@ -39,15 +39,15 @@ import org.apache.spark.util.{SystemClock, ThreadUtils}
 class ContinuousDataSourceRDD(
     sc: SparkContext,
     sqlContext: SQLContext,
-    @transient private val readTasks: java.util.List[ReadTask[UnsafeRow]])
+    @transient private val readerFactories: java.util.List[DataReaderFactory[UnsafeRow]])
   extends RDD[UnsafeRow](sc, Nil) {
 
   private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize
   private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs
 
   override protected def getPartitions: Array[Partition] = {
-    readTasks.asScala.zipWithIndex.map {
-      case (readTask, index) => new DataSourceRDDPartition(index, readTask)
+    readerFactories.asScala.zipWithIndex.map {
+      case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory)
     }.toArray
   }
 
@@ -57,7 +57,8 @@ class ContinuousDataSourceRDD(
       throw new ContinuousTaskRetryException()
     }
 
-    val reader = split.asInstanceOf[DataSourceRDDPartition[UnsafeRow]].readTask.createDataReader()
+    val reader = split.asInstanceOf[DataSourceRDDPartition[UnsafeRow]]
+      .readerFactory.createDataReader()
 
     val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)
 
@@ -136,7 +137,7 @@ class ContinuousDataSourceRDD(
   }
 
   override def getPreferredLocations(split: Partition): Seq[String] = {
-    split.asInstanceOf[DataSourceRDDPartition[UnsafeRow]].readTask.preferredLocations()
+    split.asInstanceOf[DataSourceRDDPartition[UnsafeRow]].readerFactory.preferredLocations()
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/de66abaf/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
index b4b21e7..6130448 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
@@ -68,7 +68,7 @@ class RateStreamContinuousReader(options: DataSourceV2Options)
 
   override def getStartOffset(): Offset = offset
 
-  override def createReadTasks(): java.util.List[ReadTask[Row]] = {
+  override def createDataReaderFactories(): java.util.List[DataReaderFactory[Row]] = {
     val partitionStartMap = offset match {
       case off: RateStreamOffset => off.partitionToValueAndRunTimeMs
       case off =>
@@ -86,13 +86,13 @@ class RateStreamContinuousReader(options: DataSourceV2Options)
       val start = partitionStartMap(i)
       // Have each partition advance by numPartitions each row, with starting points staggered
       // by their partition index.
-      RateStreamContinuousReadTask(
+      RateStreamContinuousDataReaderFactory(
         start.value,
         start.runTimeMs,
         i,
         numPartitions,
         perPartitionRate)
-        .asInstanceOf[ReadTask[Row]]
+        .asInstanceOf[DataReaderFactory[Row]]
     }.asJava
   }
 
@@ -101,13 +101,13 @@ class RateStreamContinuousReader(options: DataSourceV2Options)
 
 }
 
-case class RateStreamContinuousReadTask(
+case class RateStreamContinuousDataReaderFactory(
     startValue: Long,
     startTimeMs: Long,
     partitionIndex: Int,
     increment: Long,
     rowsPerSecond: Double)
-  extends ReadTask[Row] {
+  extends DataReaderFactory[Row] {
   override def createDataReader(): DataReader[Row] =
     new RateStreamContinuousDataReader(
       startValue, startTimeMs, partitionIndex, increment, rowsPerSecond)

http://git-wip-us.apache.org/repos/asf/spark/blob/de66abaf/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
index c0ed12c..a25cc4f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
@@ -123,7 +123,7 @@ class RateStreamMicroBatchReader(options: DataSourceV2Options)
     RateStreamOffset(Serialization.read[Map[Int, ValueRunTimeMsPair]](json))
   }
 
-  override def createReadTasks(): java.util.List[ReadTask[Row]] = {
+  override def createDataReaderFactories(): java.util.List[DataReaderFactory[Row]] = {
     val startMap = start.partitionToValueAndRunTimeMs
     val endMap = end.partitionToValueAndRunTimeMs
     endMap.keys.toSeq.map { part =>
@@ -139,7 +139,7 @@ class RateStreamMicroBatchReader(options: DataSourceV2Options)
         outTimeMs += msPerPartitionBetweenRows
       }
 
-      RateStreamBatchTask(packedRows).asInstanceOf[ReadTask[Row]]
+      RateStreamBatchTask(packedRows).asInstanceOf[DataReaderFactory[Row]]
     }.toList.asJava
   }
 
@@ -147,7 +147,7 @@ class RateStreamMicroBatchReader(options: DataSourceV2Options)
   override def stop(): Unit = {}
 }
 
-case class RateStreamBatchTask(vals: Seq[(Long, Long)]) extends ReadTask[Row] {
+case class RateStreamBatchTask(vals: Seq[(Long, Long)]) extends DataReaderFactory[Row] {
   override def createDataReader(): DataReader[Row] = new RateStreamBatchReader(vals)
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/de66abaf/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
index 1cfdc08..4026ee4 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
@@ -60,8 +60,8 @@ public class JavaAdvancedDataSourceV2 implements DataSourceV2, ReadSupport {
     }
 
     @Override
-    public List<ReadTask<Row>> createReadTasks() {
-      List<ReadTask<Row>> res = new ArrayList<>();
+    public List<DataReaderFactory<Row>> createDataReaderFactories() {
+      List<DataReaderFactory<Row>> res = new ArrayList<>();
 
       Integer lowerBound = null;
       for (Filter filter : filters) {
@@ -75,25 +75,25 @@ public class JavaAdvancedDataSourceV2 implements DataSourceV2, ReadSupport {
       }
 
       if (lowerBound == null) {
-        res.add(new JavaAdvancedReadTask(0, 5, requiredSchema));
-        res.add(new JavaAdvancedReadTask(5, 10, requiredSchema));
+        res.add(new JavaAdvancedDataReaderFactory(0, 5, requiredSchema));
+        res.add(new JavaAdvancedDataReaderFactory(5, 10, requiredSchema));
       } else if (lowerBound < 4) {
-        res.add(new JavaAdvancedReadTask(lowerBound + 1, 5, requiredSchema));
-        res.add(new JavaAdvancedReadTask(5, 10, requiredSchema));
+        res.add(new JavaAdvancedDataReaderFactory(lowerBound + 1, 5, requiredSchema));
+        res.add(new JavaAdvancedDataReaderFactory(5, 10, requiredSchema));
       } else if (lowerBound < 9) {
-        res.add(new JavaAdvancedReadTask(lowerBound + 1, 10, requiredSchema));
+        res.add(new JavaAdvancedDataReaderFactory(lowerBound + 1, 10, requiredSchema));
       }
 
       return res;
     }
   }
 
-  static class JavaAdvancedReadTask implements ReadTask<Row>, DataReader<Row> {
+  static class JavaAdvancedDataReaderFactory implements DataReaderFactory<Row>, DataReader<Row> {
     private int start;
     private int end;
     private StructType requiredSchema;
 
-    JavaAdvancedReadTask(int start, int end, StructType requiredSchema) {
+    JavaAdvancedDataReaderFactory(int start, int end, StructType requiredSchema) {
       this.start = start;
       this.end = end;
       this.requiredSchema = requiredSchema;
@@ -101,7 +101,7 @@ public class JavaAdvancedDataSourceV2 implements DataSourceV2, ReadSupport {
 
     @Override
     public DataReader<Row> createDataReader() {
-      return new JavaAdvancedReadTask(start - 1, end, requiredSchema);
+      return new JavaAdvancedDataReaderFactory(start - 1, end, requiredSchema);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/de66abaf/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaBatchDataSourceV2.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaBatchDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaBatchDataSourceV2.java
index a5d77a9..34e6c63 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaBatchDataSourceV2.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaBatchDataSourceV2.java
@@ -42,12 +42,14 @@ public class JavaBatchDataSourceV2 implements DataSourceV2, ReadSupport {
     }
 
     @Override
-    public List<ReadTask<ColumnarBatch>> createBatchReadTasks() {
-      return java.util.Arrays.asList(new JavaBatchReadTask(0, 50), new JavaBatchReadTask(50, 90));
+    public List<DataReaderFactory<ColumnarBatch>> createBatchDataReaderFactories() {
+      return java.util.Arrays.asList(
+               new JavaBatchDataReaderFactory(0, 50), new JavaBatchDataReaderFactory(50, 90));
     }
   }
 
-  static class JavaBatchReadTask implements ReadTask<ColumnarBatch>, DataReader<ColumnarBatch> {
+  static class JavaBatchDataReaderFactory
+      implements DataReaderFactory<ColumnarBatch>, DataReader<ColumnarBatch> {
     private int start;
     private int end;
 
@@ -57,7 +59,7 @@ public class JavaBatchDataSourceV2 implements DataSourceV2, ReadSupport {
     private OnHeapColumnVector j;
     private ColumnarBatch batch;
 
-    JavaBatchReadTask(int start, int end) {
+    JavaBatchDataReaderFactory(int start, int end) {
       this.start = start;
       this.end = end;
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/de66abaf/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
index 806d0bc..d0c8750 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
@@ -40,10 +40,10 @@ public class JavaPartitionAwareDataSource implements DataSourceV2, ReadSupport {
     }
 
     @Override
-    public List<ReadTask<Row>> createReadTasks() {
+    public List<DataReaderFactory<Row>> createDataReaderFactories() {
       return java.util.Arrays.asList(
-        new SpecificReadTask(new int[]{1, 1, 3}, new int[]{4, 4, 6}),
-        new SpecificReadTask(new int[]{2, 4, 4}, new int[]{6, 2, 2}));
+        new SpecificDataReaderFactory(new int[]{1, 1, 3}, new int[]{4, 4, 6}),
+        new SpecificDataReaderFactory(new int[]{2, 4, 4}, new int[]{6, 2, 2}));
     }
 
     @Override
@@ -70,12 +70,12 @@ public class JavaPartitionAwareDataSource implements DataSourceV2, ReadSupport {
     }
   }
 
-  static class SpecificReadTask implements ReadTask<Row>, DataReader<Row> {
+  static class SpecificDataReaderFactory implements DataReaderFactory<Row>, DataReader<Row> {
     private int[] i;
     private int[] j;
     private int current = -1;
 
-    SpecificReadTask(int[] i, int[] j) {
+    SpecificDataReaderFactory(int[] i, int[] j) {
       assert i.length == j.length;
       this.i = i;
       this.j = j;

http://git-wip-us.apache.org/repos/asf/spark/blob/de66abaf/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
index a174bd8..f997366 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
@@ -24,7 +24,7 @@ import org.apache.spark.sql.sources.v2.DataSourceV2;
 import org.apache.spark.sql.sources.v2.DataSourceV2Options;
 import org.apache.spark.sql.sources.v2.ReadSupportWithSchema;
 import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
-import org.apache.spark.sql.sources.v2.reader.ReadTask;
+import org.apache.spark.sql.sources.v2.reader.DataReaderFactory;
 import org.apache.spark.sql.types.StructType;
 
 public class JavaSchemaRequiredDataSource implements DataSourceV2, ReadSupportWithSchema {
@@ -42,7 +42,7 @@ public class JavaSchemaRequiredDataSource implements DataSourceV2, ReadSupportWi
     }
 
     @Override
-    public List<ReadTask<Row>> createReadTasks() {
+    public List<DataReaderFactory<Row>> createDataReaderFactories() {
       return java.util.Collections.emptyList();
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/de66abaf/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
index 2d458b7..2beed43 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
@@ -26,7 +26,7 @@ import org.apache.spark.sql.sources.v2.DataSourceV2;
 import org.apache.spark.sql.sources.v2.DataSourceV2Options;
 import org.apache.spark.sql.sources.v2.ReadSupport;
 import org.apache.spark.sql.sources.v2.reader.DataReader;
-import org.apache.spark.sql.sources.v2.reader.ReadTask;
+import org.apache.spark.sql.sources.v2.reader.DataReaderFactory;
 import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
 import org.apache.spark.sql.types.StructType;
 
@@ -41,25 +41,25 @@ public class JavaSimpleDataSourceV2 implements DataSourceV2, ReadSupport {
     }
 
     @Override
-    public List<ReadTask<Row>> createReadTasks() {
+    public List<DataReaderFactory<Row>> createDataReaderFactories() {
       return java.util.Arrays.asList(
-        new JavaSimpleReadTask(0, 5),
-        new JavaSimpleReadTask(5, 10));
+        new JavaSimpleDataReaderFactory(0, 5),
+        new JavaSimpleDataReaderFactory(5, 10));
     }
   }
 
-  static class JavaSimpleReadTask implements ReadTask<Row>, DataReader<Row> {
+  static class JavaSimpleDataReaderFactory implements DataReaderFactory<Row>, DataReader<Row> {
     private int start;
     private int end;
 
-    JavaSimpleReadTask(int start, int end) {
+    JavaSimpleDataReaderFactory(int start, int end) {
       this.start = start;
       this.end = end;
     }
 
     @Override
     public DataReader<Row> createDataReader() {
-      return new JavaSimpleReadTask(start - 1, end);
+      return new JavaSimpleDataReaderFactory(start - 1, end);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/de66abaf/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
index f6aa008..e818752 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
@@ -38,19 +38,20 @@ public class JavaUnsafeRowDataSourceV2 implements DataSourceV2, ReadSupport {
     }
 
     @Override
-    public List<ReadTask<UnsafeRow>> createUnsafeRowReadTasks() {
+    public List<DataReaderFactory<UnsafeRow>> createUnsafeRowReaderFactories() {
       return java.util.Arrays.asList(
-        new JavaUnsafeRowReadTask(0, 5),
-        new JavaUnsafeRowReadTask(5, 10));
+        new JavaUnsafeRowDataReaderFactory(0, 5),
+        new JavaUnsafeRowDataReaderFactory(5, 10));
     }
   }
 
-  static class JavaUnsafeRowReadTask implements ReadTask<UnsafeRow>, DataReader<UnsafeRow> {
+  static class JavaUnsafeRowDataReaderFactory
+      implements DataReaderFactory<UnsafeRow>, DataReader<UnsafeRow> {
     private int start;
     private int end;
     private UnsafeRow row;
 
-    JavaUnsafeRowReadTask(int start, int end) {
+    JavaUnsafeRowDataReaderFactory(int start, int end) {
       this.start = start;
       this.end = end;
       this.row = new UnsafeRow(2);
@@ -59,7 +60,7 @@ public class JavaUnsafeRowDataSourceV2 implements DataSourceV2, ReadSupport {
 
     @Override
     public DataReader<UnsafeRow> createDataReader() {
-      return new JavaUnsafeRowReadTask(start - 1, end);
+      return new JavaUnsafeRowDataReaderFactory(start - 1, end);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/de66abaf/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
index 85085d4..d2cfe79 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
@@ -78,7 +78,7 @@ class RateSourceV2Suite extends StreamTest {
     val reader = new RateStreamMicroBatchReader(
       new DataSourceV2Options(Map("numPartitions" -> "11", "rowsPerSecond" -> "33").asJava))
     reader.setOffsetRange(Optional.empty(), Optional.empty())
-    val tasks = reader.createReadTasks()
+    val tasks = reader.createDataReaderFactories()
     assert(tasks.size == 11)
   }
 
@@ -118,7 +118,7 @@ class RateSourceV2Suite extends StreamTest {
     val startOffset = RateStreamOffset(Map((0, ValueRunTimeMsPair(0, 1000))))
     val endOffset = RateStreamOffset(Map((0, ValueRunTimeMsPair(20, 2000))))
     reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
-    val tasks = reader.createReadTasks()
+    val tasks = reader.createDataReaderFactories()
     assert(tasks.size == 1)
     assert(tasks.get(0).asInstanceOf[RateStreamBatchTask].vals.size == 20)
   }
@@ -133,7 +133,7 @@ class RateSourceV2Suite extends StreamTest {
     }.toMap)
 
     reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
-    val tasks = reader.createReadTasks()
+    val tasks = reader.createDataReaderFactories()
     assert(tasks.size == 11)
 
     val readData = tasks.asScala
@@ -161,12 +161,12 @@ class RateSourceV2Suite extends StreamTest {
     val reader = new RateStreamContinuousReader(
       new DataSourceV2Options(Map("numPartitions" -> "2", "rowsPerSecond" -> "20").asJava))
     reader.setOffset(Optional.empty())
-    val tasks = reader.createReadTasks()
+    val tasks = reader.createDataReaderFactories()
     assert(tasks.size == 2)
 
     val data = scala.collection.mutable.ListBuffer[Row]()
     tasks.asScala.foreach {
-      case t: RateStreamContinuousReadTask =>
+      case t: RateStreamContinuousDataReaderFactory =>
         val startTimeMs = reader.getStartOffset()
           .asInstanceOf[RateStreamOffset]
           .partitionToValueAndRunTimeMs(t.partitionIndex)

http://git-wip-us.apache.org/repos/asf/spark/blob/de66abaf/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index 0620693..42c5d3b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -204,18 +204,20 @@ class SimpleDataSourceV2 extends DataSourceV2 with ReadSupport {
   class Reader extends DataSourceV2Reader {
     override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int")
 
-    override def createReadTasks(): JList[ReadTask[Row]] = {
-      java.util.Arrays.asList(new SimpleReadTask(0, 5), new SimpleReadTask(5, 10))
+    override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
+      java.util.Arrays.asList(new SimpleDataReaderFactory(0, 5), new SimpleDataReaderFactory(5, 10))
     }
   }
 
   override def createReader(options: DataSourceV2Options): DataSourceV2Reader = new Reader
 }
 
-class SimpleReadTask(start: Int, end: Int) extends ReadTask[Row] with DataReader[Row] {
+class SimpleDataReaderFactory(start: Int, end: Int)
+  extends DataReaderFactory[Row]
+  with DataReader[Row] {
   private var current = start - 1
 
-  override def createDataReader(): DataReader[Row] = new SimpleReadTask(start, end)
+  override def createDataReader(): DataReader[Row] = new SimpleDataReaderFactory(start, end)
 
   override def next(): Boolean = {
     current += 1
@@ -252,21 +254,21 @@ class AdvancedDataSourceV2 extends DataSourceV2 with ReadSupport {
       requiredSchema
     }
 
-    override def createReadTasks(): JList[ReadTask[Row]] = {
+    override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
       val lowerBound = filters.collect {
         case GreaterThan("i", v: Int) => v
       }.headOption
 
-      val res = new ArrayList[ReadTask[Row]]
+      val res = new ArrayList[DataReaderFactory[Row]]
 
       if (lowerBound.isEmpty) {
-        res.add(new AdvancedReadTask(0, 5, requiredSchema))
-        res.add(new AdvancedReadTask(5, 10, requiredSchema))
+        res.add(new AdvancedDataReaderFactory(0, 5, requiredSchema))
+        res.add(new AdvancedDataReaderFactory(5, 10, requiredSchema))
       } else if (lowerBound.get < 4) {
-        res.add(new AdvancedReadTask(lowerBound.get + 1, 5, requiredSchema))
-        res.add(new AdvancedReadTask(5, 10, requiredSchema))
+        res.add(new AdvancedDataReaderFactory(lowerBound.get + 1, 5, requiredSchema))
+        res.add(new AdvancedDataReaderFactory(5, 10, requiredSchema))
       } else if (lowerBound.get < 9) {
-        res.add(new AdvancedReadTask(lowerBound.get + 1, 10, requiredSchema))
+        res.add(new AdvancedDataReaderFactory(lowerBound.get + 1, 10, requiredSchema))
       }
 
       res
@@ -276,13 +278,13 @@ class AdvancedDataSourceV2 extends DataSourceV2 with ReadSupport {
   override def createReader(options: DataSourceV2Options): DataSourceV2Reader = new Reader
 }
 
-class AdvancedReadTask(start: Int, end: Int, requiredSchema: StructType)
-  extends ReadTask[Row] with DataReader[Row] {
+class AdvancedDataReaderFactory(start: Int, end: Int, requiredSchema: StructType)
+  extends DataReaderFactory[Row] with DataReader[Row] {
 
   private var current = start - 1
 
   override def createDataReader(): DataReader[Row] = {
-    new AdvancedReadTask(start, end, requiredSchema)
+    new AdvancedDataReaderFactory(start, end, requiredSchema)
   }
 
   override def close(): Unit = {}
@@ -307,16 +309,17 @@ class UnsafeRowDataSourceV2 extends DataSourceV2 with ReadSupport {
   class Reader extends DataSourceV2Reader with SupportsScanUnsafeRow {
     override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int")
 
-    override def createUnsafeRowReadTasks(): JList[ReadTask[UnsafeRow]] = {
-      java.util.Arrays.asList(new UnsafeRowReadTask(0, 5), new UnsafeRowReadTask(5, 10))
+    override def createUnsafeRowReaderFactories(): JList[DataReaderFactory[UnsafeRow]] = {
+      java.util.Arrays.asList(new UnsafeRowDataReaderFactory(0, 5),
+        new UnsafeRowDataReaderFactory(5, 10))
     }
   }
 
   override def createReader(options: DataSourceV2Options): DataSourceV2Reader = new Reader
 }
 
-class UnsafeRowReadTask(start: Int, end: Int)
-  extends ReadTask[UnsafeRow] with DataReader[UnsafeRow] {
+class UnsafeRowDataReaderFactory(start: Int, end: Int)
+  extends DataReaderFactory[UnsafeRow] with DataReader[UnsafeRow] {
 
   private val row = new UnsafeRow(2)
   row.pointTo(new Array[Byte](8 * 3), 8 * 3)
@@ -341,7 +344,7 @@ class UnsafeRowReadTask(start: Int, end: Int)
 class SchemaRequiredDataSource extends DataSourceV2 with ReadSupportWithSchema {
 
   class Reader(val readSchema: StructType) extends DataSourceV2Reader {
-    override def createReadTasks(): JList[ReadTask[Row]] =
+    override def createDataReaderFactories(): JList[DataReaderFactory[Row]] =
       java.util.Collections.emptyList()
   }
 
@@ -354,16 +357,16 @@ class BatchDataSourceV2 extends DataSourceV2 with ReadSupport {
   class Reader extends DataSourceV2Reader with SupportsScanColumnarBatch {
     override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int")
 
-    override def createBatchReadTasks(): JList[ReadTask[ColumnarBatch]] = {
-      java.util.Arrays.asList(new BatchReadTask(0, 50), new BatchReadTask(50, 90))
+    override def createBatchDataReaderFactories(): JList[DataReaderFactory[ColumnarBatch]] = {
+      java.util.Arrays.asList(new BatchDataReaderFactory(0, 50), new BatchDataReaderFactory(50, 90))
     }
   }
 
   override def createReader(options: DataSourceV2Options): DataSourceV2Reader = new Reader
 }
 
-class BatchReadTask(start: Int, end: Int)
-  extends ReadTask[ColumnarBatch] with DataReader[ColumnarBatch] {
+class BatchDataReaderFactory(start: Int, end: Int)
+  extends DataReaderFactory[ColumnarBatch] with DataReader[ColumnarBatch] {
 
   private final val BATCH_SIZE = 20
   private lazy val i = new OnHeapColumnVector(BATCH_SIZE, IntegerType)
@@ -406,11 +409,11 @@ class PartitionAwareDataSource extends DataSourceV2 with ReadSupport {
   class Reader extends DataSourceV2Reader with SupportsReportPartitioning {
     override def readSchema(): StructType = new StructType().add("a", "int").add("b", "int")
 
-    override def createReadTasks(): JList[ReadTask[Row]] = {
+    override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
       // Note that we don't have same value of column `a` across partitions.
       java.util.Arrays.asList(
-        new SpecificReadTask(Array(1, 1, 3), Array(4, 4, 6)),
-        new SpecificReadTask(Array(2, 4, 4), Array(6, 2, 2)))
+        new SpecificDataReaderFactory(Array(1, 1, 3), Array(4, 4, 6)),
+        new SpecificDataReaderFactory(Array(2, 4, 4), Array(6, 2, 2)))
     }
 
     override def outputPartitioning(): Partitioning = new MyPartitioning
@@ -428,7 +431,9 @@ class PartitionAwareDataSource extends DataSourceV2 with ReadSupport {
   override def createReader(options: DataSourceV2Options): DataSourceV2Reader = new Reader
 }
 
-class SpecificReadTask(i: Array[Int], j: Array[Int]) extends ReadTask[Row] with DataReader[Row] {
+class SpecificDataReaderFactory(i: Array[Int], j: Array[Int])
+  extends DataReaderFactory[Row]
+  with DataReader[Row] {
   assert(i.length == j.length)
 
   private var current = -1

http://git-wip-us.apache.org/repos/asf/spark/blob/de66abaf/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
index cd7252e..3310d6d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path}
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.{Row, SaveMode}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.sources.v2.reader.{DataReader, DataSourceV2Reader, ReadTask}
+import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, DataSourceV2Reader}
 import org.apache.spark.sql.sources.v2.writer._
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.util.SerializableConfiguration
@@ -45,7 +45,7 @@ class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteS
   class Reader(path: String, conf: Configuration) extends DataSourceV2Reader {
     override def readSchema(): StructType = schema
 
-    override def createReadTasks(): JList[ReadTask[Row]] = {
+    override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
       val dataPath = new Path(path)
       val fs = dataPath.getFileSystem(conf)
       if (fs.exists(dataPath)) {
@@ -54,7 +54,9 @@ class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteS
           name.startsWith("_") || name.startsWith(".")
         }.map { f =>
           val serializableConf = new SerializableConfiguration(conf)
-          new SimpleCSVReadTask(f.getPath.toUri.toString, serializableConf): ReadTask[Row]
+          new SimpleCSVDataReaderFactory(
+            f.getPath.toUri.toString,
+            serializableConf): DataReaderFactory[Row]
         }.toList.asJava
       } else {
         Collections.emptyList()
@@ -149,8 +151,8 @@ class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteS
   }
 }
 
-class SimpleCSVReadTask(path: String, conf: SerializableConfiguration)
-  extends ReadTask[Row] with DataReader[Row] {
+class SimpleCSVDataReaderFactory(path: String, conf: SerializableConfiguration)
+  extends DataReaderFactory[Row] with DataReader[Row] {
 
   @transient private var lines: Iterator[String] = _
   @transient private var currentLine: String = _

http://git-wip-us.apache.org/repos/asf/spark/blob/de66abaf/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index d4f8bae..dc8c857 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
 import org.apache.spark.sql.sources.v2.DataSourceV2Options
-import org.apache.spark.sql.sources.v2.reader.ReadTask
+import org.apache.spark.sql.sources.v2.reader.DataReaderFactory
 import org.apache.spark.sql.sources.v2.streaming._
 import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousReader, MicroBatchReader, Offset, PartitionOffset}
 import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
@@ -45,7 +45,7 @@ case class FakeReader() extends MicroBatchReader with ContinuousReader {
   def mergeOffsets(offsets: Array[PartitionOffset]): Offset = RateStreamOffset(Map())
   def setOffset(start: Optional[Offset]): Unit = {}
 
-  def createReadTasks(): java.util.ArrayList[ReadTask[Row]] = {
+  def createDataReaderFactories(): java.util.ArrayList[DataReaderFactory[Row]] = {
     throw new IllegalStateException("fake source - cannot actually read")
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message