spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zsxw...@apache.org
Subject spark git commit: [SPARK-19140][SS] Allow update mode for non-aggregation streaming queries
Date Wed, 11 Jan 2017 01:58:28 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 81c943090 -> 230607d62


[SPARK-19140][SS] Allow update mode for non-aggregation streaming queries

## What changes were proposed in this pull request?

This PR allow update mode for non-aggregation streaming queries. It will be same as the append
mode if a query has no aggregations.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16520 from zsxwing/update-without-agg.

(cherry picked from commit bc6c56e940fe93591a1e5ba45751f1b243b57e28)
Signed-off-by: Shixiong Zhu <shixiong@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/230607d6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/230607d6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/230607d6

Branch: refs/heads/branch-2.1
Commit: 230607d62493c36b214c01a70aa9b0dbb3a9ad4d
Parents: 81c9430
Author: Shixiong Zhu <shixiong@databricks.com>
Authored: Tue Jan 10 17:58:11 2017 -0800
Committer: Shixiong Zhu <shixiong@databricks.com>
Committed: Tue Jan 10 17:58:23 2017 -0800

----------------------------------------------------------------------
 docs/structured-streaming-programming-guide.md  |  4 +--
 python/pyspark/sql/streaming.py                 | 27 ++++++++++-----
 .../apache/spark/sql/streaming/OutputMode.java  |  3 +-
 .../analysis/UnsupportedOperationChecker.scala  |  2 +-
 .../streaming/InternalOutputModes.scala         |  4 +--
 .../analysis/UnsupportedOperationsSuite.scala   | 31 +++++++++--------
 .../spark/sql/streaming/DataStreamWriter.scala  | 18 ++++------
 .../execution/streaming/MemorySinkSuite.scala   | 35 ++++++++++++--------
 8 files changed, 72 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/230607d6/docs/structured-streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index 473a196..45ee551 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -374,7 +374,7 @@ The "Output" is defined as what gets written out to the external storage.
The ou
 
   - *Append Mode* - Only the new rows appended in the Result Table since the last trigger
will be written to the external storage. This is applicable only on the queries where existing
rows in the Result Table are not expected to change.
   
-  - *Update Mode* - Only the rows that were updated in the Result Table since the last trigger
will be written to the external storage (available since Spark 2.1.1). Note that this is different
from the Complete Mode in that this mode only outputs the rows that have changed since the
last trigger.
+  - *Update Mode* - Only the rows that were updated in the Result Table since the last trigger
will be written to the external storage (available since Spark 2.1.1). Note that this is different
from the Complete Mode in that this mode only outputs the rows that have changed since the
last trigger. If the query doesn't contain aggregations, it will be equivalent to Append mode.
 
 Note that each mode is applicable on certain types of queries. This is discussed in detail
[later](#output-modes).
 
@@ -977,7 +977,7 @@ Here is the compatibility matrix.
   </tr>
   <tr>
     <td colspan="2" style="vertical-align: middle;">Queries without aggregation</td>
-    <td style="vertical-align: middle;">Append</td>
+    <td style="vertical-align: middle;">Append, Update</td>
     <td style="vertical-align: middle;">
         Complete mode not supported as it is infeasible to keep all data in the Result Table.
     </td>

http://git-wip-us.apache.org/repos/asf/spark/blob/230607d6/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 5014299..a10b185 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -665,6 +665,9 @@ class DataStreamWriter(object):
            the sink
         * `complete`:All the rows in the streaming DataFrame/Dataset will be written to the
sink
            every time these is some updates
+        * `update`:only the rows that were updated in the streaming DataFrame/Dataset will
be
+           written to the sink every time there are some updates. If the query doesn't contain
+           aggregations, it will be equivalent to `append` mode.
 
        .. note:: Experimental.
 
@@ -768,7 +771,8 @@ class DataStreamWriter(object):
 
     @ignore_unicode_prefix
     @since(2.0)
-    def start(self, path=None, format=None, partitionBy=None, queryName=None, **options):
+    def start(self, path=None, format=None, outputMode=None, partitionBy=None, queryName=None,
+              **options):
         """Streams the contents of the :class:`DataFrame` to a data source.
 
         The data source is specified by the ``format`` and a set of ``options``.
@@ -779,15 +783,20 @@ class DataStreamWriter(object):
 
         :param path: the path in a Hadoop supported file system
         :param format: the format used to save
-
-            * ``append``: Append contents of this :class:`DataFrame` to existing data.
-            * ``overwrite``: Overwrite existing data.
-            * ``ignore``: Silently ignore this operation if data already exists.
-            * ``error`` (default case): Throw an exception if data already exists.
+        :param outputMode: specifies how data of a streaming DataFrame/Dataset is written
to a
+                           streaming sink.
+
+            * `append`:Only the new rows in the streaming DataFrame/Dataset will be written
to the
+              sink
+            * `complete`:All the rows in the streaming DataFrame/Dataset will be written
to the sink
+               every time these is some updates
+            * `update`:only the rows that were updated in the streaming DataFrame/Dataset
will be
+              written to the sink every time there are some updates. If the query doesn't
contain
+              aggregations, it will be equivalent to `append` mode.
         :param partitionBy: names of partitioning columns
         :param queryName: unique name for the query
         :param options: All other string options. You may want to provide a `checkpointLocation`
-            for most streams, however it is not required for a `memory` stream.
+                        for most streams, however it is not required for a `memory` stream.
 
         >>> sq = sdf.writeStream.format('memory').queryName('this_query').start()
         >>> sq.isActive
@@ -798,7 +807,7 @@ class DataStreamWriter(object):
         >>> sq.isActive
         False
         >>> sq = sdf.writeStream.trigger(processingTime='5 seconds').start(
-        ...     queryName='that_query', format='memory')
+        ...     queryName='that_query', outputMode="append", format='memory')
         >>> sq.name
         u'that_query'
         >>> sq.isActive
@@ -806,6 +815,8 @@ class DataStreamWriter(object):
         >>> sq.stop()
         """
         self.options(**options)
+        if outputMode is not None:
+            self.outputMode(outputMode)
         if partitionBy is not None:
             self.partitionBy(partitionBy)
         if format is not None:

http://git-wip-us.apache.org/repos/asf/spark/blob/230607d6/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
index cf0579f..3f7cdb2 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
@@ -57,7 +57,8 @@ public class OutputMode {
 
   /**
    * OutputMode in which only the rows that were updated in the streaming DataFrame/Dataset
will
-   * be written to the sink every time there are some updates.
+   * be written to the sink every time there are some updates. If the query doesn't contain
+   * aggregations, it will be equivalent to `Append` mode.
    *
    * @since 2.1.1
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/230607d6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index 053c8eb..c2666b2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -73,7 +73,7 @@ object UnsupportedOperationChecker {
                 s"streaming DataFrames/DataSets")(plan)
         }
 
-      case InternalOutputModes.Complete | InternalOutputModes.Update if aggregates.isEmpty
=>
+      case InternalOutputModes.Complete if aggregates.isEmpty =>
         throwError(
           s"$outputMode output mode not supported when there are no streaming aggregations
on " +
             s"streaming DataFrames/Datasets")(plan)

http://git-wip-us.apache.org/repos/asf/spark/blob/230607d6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala
index 915f4a9..351bd6f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala
@@ -40,8 +40,8 @@ private[sql] object InternalOutputModes {
 
   /**
    * OutputMode in which only the rows in the streaming DataFrame/Dataset that were updated
will be
-   * written to the sink every time these is some updates. This output mode can only be used
in
-   * queries that contain aggregations.
+   * written to the sink every time these is some updates. If the query doesn't contain
+   * aggregations, it will be equivalent to `Append` mode.
    */
   case object Update extends OutputMode
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/230607d6/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index d2c0f8c..58e69f9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -219,9 +219,9 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     "window", Window(Nil, Nil, Nil, _), expectedMsg = "non-time-based windows")
 
   // Output modes with aggregation and non-aggregation plans
-  testOutputMode(Append, shouldSupportAggregation = false)
-  testOutputMode(Update, shouldSupportAggregation = true)
-  testOutputMode(Complete, shouldSupportAggregation = true)
+  testOutputMode(Append, shouldSupportAggregation = false, shouldSupportNonAggregation =
true)
+  testOutputMode(Update, shouldSupportAggregation = true, shouldSupportNonAggregation = true)
+  testOutputMode(Complete, shouldSupportAggregation = true, shouldSupportNonAggregation =
false)
 
   /*
     =======================================================================================
@@ -323,30 +323,33 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
   /** Test output mode with and without aggregation in the streaming plan */
   def testOutputMode(
       outputMode: OutputMode,
-      shouldSupportAggregation: Boolean): Unit = {
+      shouldSupportAggregation: Boolean,
+      shouldSupportNonAggregation: Boolean): Unit = {
 
     // aggregation
     if (shouldSupportAggregation) {
-      assertNotSupportedInStreamingPlan(
-        s"$outputMode output mode - no aggregation",
-        streamRelation.where($"a" > 1),
-        outputMode = outputMode,
-        Seq("aggregation", s"$outputMode output mode"))
-
       assertSupportedInStreamingPlan(
         s"$outputMode output mode - aggregation",
         streamRelation.groupBy("a")("count(*)"),
         outputMode = outputMode)
-
     } else {
+      assertNotSupportedInStreamingPlan(
+        s"$outputMode output mode - aggregation",
+        streamRelation.groupBy("a")("count(*)"),
+        outputMode = outputMode,
+        Seq("aggregation", s"$outputMode output mode"))
+    }
+
+    // non aggregation
+    if (shouldSupportNonAggregation) {
       assertSupportedInStreamingPlan(
         s"$outputMode output mode - no aggregation",
         streamRelation.where($"a" > 1),
         outputMode = outputMode)
-
+    } else {
       assertNotSupportedInStreamingPlan(
-        s"$outputMode output mode - aggregation",
-        streamRelation.groupBy("a")("count(*)"),
+        s"$outputMode output mode - no aggregation",
+        streamRelation.where($"a" > 1),
         outputMode = outputMode,
         Seq("aggregation", s"$outputMode output mode"))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/230607d6/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index bf25b48..5f49bef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -44,6 +44,10 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
    *                            written to the sink
    *   - `OutputMode.Complete()`: all the rows in the streaming DataFrame/Dataset will be
written
    *                              to the sink every time these is some updates
+   *   - `OutputMode.Update()`: only the rows that were updated in the streaming DataFrame/Dataset
+   *                            will be written to the sink every time there are some updates.
If
+   *                            the query doesn't contain aggregations, it will be equivalent
to
+   *                            `OutputMode.Append()` mode.
    *
    * @since 2.0.0
    */
@@ -58,7 +62,9 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
    *                 the sink
    *   - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the
sink
    *                 every time these is some updates
-   *
+   *   - `update`:   only the rows that were updated in the streaming DataFrame/Dataset will
+   *                 be written to the sink every time there are some updates. If the query
doesn't
+   *                 contain aggregations, it will be equivalent to `append` mode.
    * @since 2.0.0
    */
   def outputMode(outputMode: String): DataStreamWriter[T] = {
@@ -220,16 +226,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
       if (extraOptions.get("queryName").isEmpty) {
         throw new AnalysisException("queryName must be specified for memory sink")
       }
-      val supportedModes = "Output modes supported by the memory sink are 'append' and 'complete'."
-      outputMode match {
-        case Append | Complete => // allowed
-        case Update =>
-          throw new AnalysisException(
-            s"Update output mode is not supported for memory sink. $supportedModes")
-        case _ =>
-          throw new AnalysisException(
-            s"$outputMode is not supported for memory sink. $supportedModes")
-      }
       val sink = new MemorySink(df.schema, outputMode)
       val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink))
       val chkpointLoc = extraOptions.get("checkpointLocation")

http://git-wip-us.apache.org/repos/asf/spark/blob/230607d6/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala
index ca724fc..8f23f98 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala
@@ -137,7 +137,7 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
   }
 
 
-  test("registering as a table in Append output mode - supported") {
+  test("registering as a table in Append output mode") {
     val input = MemoryStream[Int]
     val query = input.toDF().writeStream
       .format("memory")
@@ -160,7 +160,7 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
     query.stop()
   }
 
-  test("registering as a table in Complete output mode - supported") {
+  test("registering as a table in Complete output mode") {
     val input = MemoryStream[Int]
     val query = input.toDF()
       .groupBy("value")
@@ -186,18 +186,27 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
     query.stop()
   }
 
-  test("registering as a table in Update output mode - not supported") {
+  test("registering as a table in Update output mode") {
     val input = MemoryStream[Int]
-    val df = input.toDF()
-      .groupBy("value")
-      .count()
-    intercept[AnalysisException] {
-      df.writeStream
-        .format("memory")
-        .outputMode("update")
-        .queryName("memStream")
-        .start()
-    }
+    val query = input.toDF().writeStream
+      .format("memory")
+      .outputMode("update")
+      .queryName("memStream")
+      .start()
+    input.addData(1, 2, 3)
+    query.processAllAvailable()
+
+    checkDataset(
+      spark.table("memStream").as[Int],
+      1, 2, 3)
+
+    input.addData(4, 5, 6)
+    query.processAllAvailable()
+    checkDataset(
+      spark.table("memStream").as[Int],
+      1, 2, 3, 4, 5, 6)
+
+    query.stop()
   }
 
   test("MemoryPlan statistics") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message