spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-18234][SS] Made update mode public
Date Thu, 22 Dec 2016 00:43:21 GMT
Repository: spark
Updated Branches:
  refs/heads/master afd9bc1d8 -> 83a6ace0d


[SPARK-18234][SS] Made update mode public

## What changes were proposed in this pull request?

Made update mode public. As part of that here are the changes.
- Update DatastreamWriter to accept "update"
- Changed package of InternalOutputModes from o.a.s.sql to o.a.s.sql.catalyst
- Added update mode state removing with watermark to StateStoreSaveExec

## How was this patch tested?

Added new tests in changed modules

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #16360 from tdas/SPARK-18234.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83a6ace0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83a6ace0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83a6ace0

Branch: refs/heads/master
Commit: 83a6ace0d1be44f70e768348ae6688798c84343e
Parents: afd9bc1
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Wed Dec 21 16:43:17 2016 -0800
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Wed Dec 21 16:43:17 2016 -0800

----------------------------------------------------------------------
 .../apache/spark/sql/streaming/OutputMode.java  |  12 +-
 .../apache/spark/sql/InternalOutputModes.scala  |  47 ---
 .../analysis/UnsupportedOperationChecker.scala  |   3 +-
 .../streaming/InternalOutputModes.scala         |  47 +++
 .../analysis/UnsupportedOperationsSuite.scala   |   2 +-
 .../sql/execution/datasources/DataSource.scala  |   2 +-
 .../execution/streaming/StatefulAggregate.scala |  61 ++--
 .../spark/sql/execution/streaming/memory.scala  |   5 +-
 .../spark/sql/streaming/DataStreamWriter.scala  |  17 +-
 .../execution/streaming/MemorySinkSuite.scala   | 287 +++++++++++++++++++
 .../sql/streaming/EventTimeWatermarkSuite.scala |  55 +++-
 .../sql/streaming/FileStreamSinkSuite.scala     |  22 +-
 .../sql/streaming/FileStreamSourceSuite.scala   |   2 +-
 .../spark/sql/streaming/MemorySinkSuite.scala   | 274 ------------------
 .../spark/sql/streaming/StreamSuite.scala       |   8 +-
 .../streaming/StreamingAggregationSuite.scala   |   2 +-
 .../test/DataStreamReaderWriterSuite.scala      |  38 ++-
 17 files changed, 507 insertions(+), 377 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
index a515c1a..cf0579f 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
@@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming;
 
 import org.apache.spark.annotation.Experimental;
 import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.InternalOutputModes;
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes;
 
 /**
  * :: Experimental ::
@@ -54,4 +54,14 @@ public class OutputMode {
   public static OutputMode Complete() {
     return InternalOutputModes.Complete$.MODULE$;
   }
+
+  /**
+   * OutputMode in which only the rows that were updated in the streaming DataFrame/Dataset will
+   * be written to the sink every time there are some updates.
+   *
+   * @since 2.1.1
+   */
+  public static OutputMode Update() {
+    return InternalOutputModes.Update$.MODULE$;
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala
deleted file mode 100644
index 594c41c..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.sql.streaming.OutputMode
-
-/**
- * Internal helper class to generate objects representing various `OutputMode`s,
- */
-private[sql] object InternalOutputModes {
-
-  /**
-   * OutputMode in which only the new rows in the streaming DataFrame/Dataset will be
-   * written to the sink. This output mode can be only be used in queries that do not
-   * contain any aggregation.
-   */
-  case object Append extends OutputMode
-
-  /**
-   * OutputMode in which all the rows in the streaming DataFrame/Dataset will be written
-   * to the sink every time these is some updates. This output mode can only be used in queries
-   * that contain aggregations.
-   */
-  case object Complete extends OutputMode
-
-  /**
-   * OutputMode in which only the rows in the streaming DataFrame/Dataset that were updated will be
-   * written to the sink every time these is some updates. This output mode can only be used in
-   * queries that contain aggregations.
-   */
-  case object Update extends OutputMode
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index c4a78f9..60d9881 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -17,11 +17,12 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
-import org.apache.spark.sql.{AnalysisException, InternalOutputModes}
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
 import org.apache.spark.sql.streaming.OutputMode
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala
new file mode 100644
index 0000000..915f4a9
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.streaming
+
+import org.apache.spark.sql.streaming.OutputMode
+
+/**
+ * Internal helper class to generate objects representing various `OutputMode`s,
+ */
+private[sql] object InternalOutputModes {
+
+  /**
+   * OutputMode in which only the new rows in the streaming DataFrame/Dataset will be
+   * written to the sink. This output mode can be only be used in queries that do not
+   * contain any aggregation.
+   */
+  case object Append extends OutputMode
+
+  /**
+   * OutputMode in which all the rows in the streaming DataFrame/Dataset will be written
+   * to the sink every time these is some updates. This output mode can only be used in queries
+   * that contain aggregations.
+   */
+  case object Complete extends OutputMode
+
+  /**
+   * OutputMode in which only the rows in the streaming DataFrame/Dataset that were updated will be
+   * written to the sink every time these is some updates. This output mode can only be used in
+   * queries that contain aggregations.
+   */
+  case object Update extends OutputMode
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index 34e94c7..94a008f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.InternalOutputModes._
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
@@ -27,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference,
 import org.apache.spark.sql.catalyst.expressions.aggregate.Count
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.IntegerType
 

http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 5245c14..ac3f068 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -278,7 +278,7 @@ case class DataSource(
           throw new IllegalArgumentException("'path' is not specified")
         })
         if (outputMode != OutputMode.Append) {
-          throw new IllegalArgumentException(
+          throw new AnalysisException(
             s"Data source $className does not support $outputMode output mode")
         }
         new FileStreamSink(sparkSession, path, fileFormat, partitionColumns, caseInsensitiveOptions)

http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
index 7af978a..0551e4b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
@@ -21,11 +21,11 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratePredicate, GenerateUnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, Predicate}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
 import org.apache.spark.sql.execution
-import org.apache.spark.sql.InternalOutputModes._
-import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.execution.streaming.state._
 import org.apache.spark.sql.execution.SparkPlan
@@ -108,6 +108,30 @@ case class StateStoreSaveExec(
     "numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of total state rows"),
     "numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of updated state rows"))
 
+  /** Generate a predicate that matches data older than the watermark */
+  private lazy val watermarkPredicate: Option[Predicate] = {
+    val optionalWatermarkAttribute =
+      keyExpressions.find(_.metadata.contains(EventTimeWatermark.delayKey))
+
+    optionalWatermarkAttribute.map { watermarkAttribute =>
+      // If we are evicting based on a window, use the end of the window.  Otherwise just
+      // use the attribute itself.
+      val evictionExpression =
+        if (watermarkAttribute.dataType.isInstanceOf[StructType]) {
+          LessThanOrEqual(
+            GetStructField(watermarkAttribute, 1),
+            Literal(eventTimeWatermark.get * 1000))
+        } else {
+          LessThanOrEqual(
+            watermarkAttribute,
+            Literal(eventTimeWatermark.get * 1000))
+        }
+
+      logInfo(s"Filtering state store on: $evictionExpression")
+      newPredicate(evictionExpression, keyExpressions)
+    }
+  }
+
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
     assert(outputMode.nonEmpty,
@@ -151,25 +175,8 @@ case class StateStoreSaveExec(
               numUpdatedStateRows += 1
             }
 
-            val watermarkAttribute =
-              keyExpressions.find(_.metadata.contains(EventTimeWatermark.delayKey)).get
-            // If we are evicting based on a window, use the end of the window.  Otherwise just
-            // use the attribute itself.
-            val evictionExpression =
-              if (watermarkAttribute.dataType.isInstanceOf[StructType]) {
-                LessThanOrEqual(
-                  GetStructField(watermarkAttribute, 1),
-                  Literal(eventTimeWatermark.get * 1000))
-              } else {
-                LessThanOrEqual(
-                  watermarkAttribute,
-                  Literal(eventTimeWatermark.get * 1000))
-              }
-
-            logInfo(s"Filtering state store on: $evictionExpression")
-            val predicate = newPredicate(evictionExpression, keyExpressions)
-            store.remove(predicate.eval)
-
+            // Assumption: Append mode can be done only when watermark has been specified
+            store.remove(watermarkPredicate.get.eval)
             store.commit()
 
             numTotalStateRows += store.numKeys()
@@ -180,11 +187,19 @@ case class StateStoreSaveExec(
 
           // Update and output modified rows from the StateStore.
           case Some(Update) =>
+
             new Iterator[InternalRow] {
-              private[this] val baseIterator = iter
+
+              // Filter late date using watermark if specified
+              private[this] val baseIterator = watermarkPredicate match {
+                case Some(predicate) => iter.filter((row: InternalRow) => !predicate.eval(row))
+                case None => iter
+              }
 
               override def hasNext: Boolean = {
                 if (!baseIterator.hasNext) {
+                  // Remove old aggregates if watermark specified
+                  if (watermarkPredicate.nonEmpty) store.remove(watermarkPredicate.get.eval)
                   store.commit()
                   numTotalStateRows += store.numKeys()
                   false

http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index b699be2..91da6b3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.encoders.encoderFor
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
@@ -193,11 +194,11 @@ class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink wi
     if (notCommitted) {
       logDebug(s"Committing batch $batchId to $this")
       outputMode match {
-        case InternalOutputModes.Append | InternalOutputModes.Update =>
+        case Append | Update =>
           val rows = AddedData(batchId, data.collect())
           synchronized { batches += rows }
 
-        case InternalOutputModes.Complete =>
+        case Complete =>
           val rows = AddedData(batchId, data.collect())
           synchronized {
             batches.clear()

http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index b7fc336..6c0c5e0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, ForeachWriter}
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, MemorySink}
 
@@ -65,9 +66,11 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
         OutputMode.Append
       case "complete" =>
         OutputMode.Complete
+      case "update" =>
+        OutputMode.Update
       case _ =>
         throw new IllegalArgumentException(s"Unknown output mode $outputMode. " +
-          "Accepted output modes are 'append' and 'complete'")
+          "Accepted output modes are 'append', 'complete', 'update'")
     }
     this
   }
@@ -99,7 +102,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
     this
   }
 
-
   /**
    * Specifies the name of the [[StreamingQuery]] that can be started with `start()`.
    * This name must be unique among all the currently active queries in the associated SQLContext.
@@ -219,7 +221,16 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
       if (extraOptions.get("queryName").isEmpty) {
         throw new AnalysisException("queryName must be specified for memory sink")
       }
-
+      val supportedModes = "Output modes supported by the memory sink are 'append' and 'complete'."
+      outputMode match {
+        case Append | Complete => // allowed
+        case Update =>
+          throw new AnalysisException(
+            s"Update output mode is not supported for memory sink. $supportedModes")
+        case _ =>
+          throw new AnalysisException(
+            s"$outputMode is not supported for memory sink. $supportedModes")
+      }
       val sink = new MemorySink(df.schema, outputMode)
       val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink))
       val chkpointLoc = extraOptions.get("checkpointLocation")

http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala
new file mode 100644
index 0000000..ca724fc
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.language.implicitConversions
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.streaming.{OutputMode, StreamTest}
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+import org.apache.spark.util.Utils
+
+class MemorySinkSuite extends StreamTest with BeforeAndAfter {
+
+  import testImplicits._
+
+  after {
+    sqlContext.streams.active.foreach(_.stop())
+  }
+
+  test("directly add data in Append output mode") {
+    implicit val schema = new StructType().add(new StructField("value", IntegerType))
+    val sink = new MemorySink(schema, OutputMode.Append)
+
+    // Before adding data, check output
+    assert(sink.latestBatchId === None)
+    checkAnswer(sink.latestBatchData, Seq.empty)
+    checkAnswer(sink.allData, Seq.empty)
+
+    // Add batch 0 and check outputs
+    sink.addBatch(0, 1 to 3)
+    assert(sink.latestBatchId === Some(0))
+    checkAnswer(sink.latestBatchData, 1 to 3)
+    checkAnswer(sink.allData, 1 to 3)
+
+    // Add batch 1 and check outputs
+    sink.addBatch(1, 4 to 6)
+    assert(sink.latestBatchId === Some(1))
+    checkAnswer(sink.latestBatchData, 4 to 6)
+    checkAnswer(sink.allData, 1 to 6)     // new data should get appended to old data
+
+    // Re-add batch 1 with different data, should not be added and outputs should not be changed
+    sink.addBatch(1, 7 to 9)
+    assert(sink.latestBatchId === Some(1))
+    checkAnswer(sink.latestBatchData, 4 to 6)
+    checkAnswer(sink.allData, 1 to 6)
+
+    // Add batch 2 and check outputs
+    sink.addBatch(2, 7 to 9)
+    assert(sink.latestBatchId === Some(2))
+    checkAnswer(sink.latestBatchData, 7 to 9)
+    checkAnswer(sink.allData, 1 to 9)
+  }
+
+  test("directly add data in Update output mode") {
+    implicit val schema = new StructType().add(new StructField("value", IntegerType))
+    val sink = new MemorySink(schema, OutputMode.Update)
+
+    // Before adding data, check output
+    assert(sink.latestBatchId === None)
+    checkAnswer(sink.latestBatchData, Seq.empty)
+    checkAnswer(sink.allData, Seq.empty)
+
+    // Add batch 0 and check outputs
+    sink.addBatch(0, 1 to 3)
+    assert(sink.latestBatchId === Some(0))
+    checkAnswer(sink.latestBatchData, 1 to 3)
+    checkAnswer(sink.allData, 1 to 3)
+
+    // Add batch 1 and check outputs
+    sink.addBatch(1, 4 to 6)
+    assert(sink.latestBatchId === Some(1))
+    checkAnswer(sink.latestBatchData, 4 to 6)
+    checkAnswer(sink.allData, 1 to 6) // new data should get appended to old data
+
+    // Re-add batch 1 with different data, should not be added and outputs should not be changed
+    sink.addBatch(1, 7 to 9)
+    assert(sink.latestBatchId === Some(1))
+    checkAnswer(sink.latestBatchData, 4 to 6)
+    checkAnswer(sink.allData, 1 to 6)
+
+    // Add batch 2 and check outputs
+    sink.addBatch(2, 7 to 9)
+    assert(sink.latestBatchId === Some(2))
+    checkAnswer(sink.latestBatchData, 7 to 9)
+    checkAnswer(sink.allData, 1 to 9)
+  }
+
+  test("directly add data in Complete output mode") {
+    implicit val schema = new StructType().add(new StructField("value", IntegerType))
+    val sink = new MemorySink(schema, OutputMode.Complete)
+
+    // Before adding data, check output
+    assert(sink.latestBatchId === None)
+    checkAnswer(sink.latestBatchData, Seq.empty)
+    checkAnswer(sink.allData, Seq.empty)
+
+    // Add batch 0 and check outputs
+    sink.addBatch(0, 1 to 3)
+    assert(sink.latestBatchId === Some(0))
+    checkAnswer(sink.latestBatchData, 1 to 3)
+    checkAnswer(sink.allData, 1 to 3)
+
+    // Add batch 1 and check outputs
+    sink.addBatch(1, 4 to 6)
+    assert(sink.latestBatchId === Some(1))
+    checkAnswer(sink.latestBatchData, 4 to 6)
+    checkAnswer(sink.allData, 4 to 6)     // new data should replace old data
+
+    // Re-add batch 1 with different data, should not be added and outputs should not be changed
+    sink.addBatch(1, 7 to 9)
+    assert(sink.latestBatchId === Some(1))
+    checkAnswer(sink.latestBatchData, 4 to 6)
+    checkAnswer(sink.allData, 4 to 6)
+
+    // Add batch 2 and check outputs
+    sink.addBatch(2, 7 to 9)
+    assert(sink.latestBatchId === Some(2))
+    checkAnswer(sink.latestBatchData, 7 to 9)
+    checkAnswer(sink.allData, 7 to 9)
+  }
+
+
+  test("registering as a table in Append output mode - supported") {
+    val input = MemoryStream[Int]
+    val query = input.toDF().writeStream
+      .format("memory")
+      .outputMode("append")
+      .queryName("memStream")
+      .start()
+    input.addData(1, 2, 3)
+    query.processAllAvailable()
+
+    checkDataset(
+      spark.table("memStream").as[Int],
+      1, 2, 3)
+
+    input.addData(4, 5, 6)
+    query.processAllAvailable()
+    checkDataset(
+      spark.table("memStream").as[Int],
+      1, 2, 3, 4, 5, 6)
+
+    query.stop()
+  }
+
+  test("registering as a table in Complete output mode - supported") {
+    val input = MemoryStream[Int]
+    val query = input.toDF()
+      .groupBy("value")
+      .count()
+      .writeStream
+      .format("memory")
+      .outputMode("complete")
+      .queryName("memStream")
+      .start()
+    input.addData(1, 2, 3)
+    query.processAllAvailable()
+
+    checkDatasetUnorderly(
+      spark.table("memStream").as[(Int, Long)],
+      (1, 1L), (2, 1L), (3, 1L))
+
+    input.addData(4, 5, 6)
+    query.processAllAvailable()
+    checkDatasetUnorderly(
+      spark.table("memStream").as[(Int, Long)],
+      (1, 1L), (2, 1L), (3, 1L), (4, 1L), (5, 1L), (6, 1L))
+
+    query.stop()
+  }
+
+  test("registering as a table in Update output mode - not supported") {
+    val input = MemoryStream[Int]
+    val df = input.toDF()
+      .groupBy("value")
+      .count()
+    intercept[AnalysisException] {
+      df.writeStream
+        .format("memory")
+        .outputMode("update")
+        .queryName("memStream")
+        .start()
+    }
+  }
+
+  test("MemoryPlan statistics") {
+    implicit val schema = new StructType().add(new StructField("value", IntegerType))
+    val sink = new MemorySink(schema, OutputMode.Append)
+    val plan = new MemoryPlan(sink)
+
+    // Before adding data, check output
+    checkAnswer(sink.allData, Seq.empty)
+    assert(plan.statistics.sizeInBytes === 0)
+
+    sink.addBatch(0, 1 to 3)
+    assert(plan.statistics.sizeInBytes === 12)
+
+    sink.addBatch(1, 4 to 6)
+    assert(plan.statistics.sizeInBytes === 24)
+  }
+
+  ignore("stress test") {
+    // Ignore the stress test as it takes several minutes to run
+    (0 until 1000).foreach { _ =>
+      val input = MemoryStream[Int]
+      val query = input.toDF().writeStream
+        .format("memory")
+        .queryName("memStream")
+        .start()
+      input.addData(1, 2, 3)
+      query.processAllAvailable()
+
+      checkDataset(
+        spark.table("memStream").as[Int],
+        1, 2, 3)
+
+      input.addData(4, 5, 6)
+      query.processAllAvailable()
+      checkDataset(
+        spark.table("memStream").as[Int],
+        1, 2, 3, 4, 5, 6)
+
+      query.stop()
+    }
+  }
+
+  test("error when no name is specified") {
+    val error = intercept[AnalysisException] {
+      val input = MemoryStream[Int]
+      val query = input.toDF().writeStream
+          .format("memory")
+          .start()
+    }
+
+    assert(error.message contains "queryName must be specified")
+  }
+
+  test("error if attempting to resume specific checkpoint") {
+    val location = Utils.createTempDir(namePrefix = "steaming.checkpoint").getCanonicalPath
+
+    val input = MemoryStream[Int]
+    val query = input.toDF().writeStream
+        .format("memory")
+        .queryName("memStream")
+        .option("checkpointLocation", location)
+        .start()
+    input.addData(1, 2, 3)
+    query.processAllAvailable()
+    query.stop()
+
+    intercept[AnalysisException] {
+      input.toDF().writeStream
+        .format("memory")
+        .queryName("memStream")
+        .option("checkpointLocation", location)
+        .start()
+    }
+  }
+
+  private def checkAnswer(rows: Seq[Row], expected: Seq[Int])(implicit schema: StructType): Unit = {
+    checkAnswer(
+      sqlContext.createDataFrame(sparkContext.makeRDD(rows), schema),
+      intsToDF(expected)(schema))
+  }
+
+  private implicit def intsToDF(seq: Seq[Int])(implicit schema: StructType): DataFrame = {
+    require(schema.fields.size === 1)
+    sqlContext.createDataset(seq).toDF(schema.fieldNames.head)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index bdfba95..23f51ff 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -19,15 +19,15 @@ package org.apache.spark.sql.streaming
 
 import java.{util => ju}
 import java.text.SimpleDateFormat
-import java.util.{Calendar, Date}
+import java.util.Date
 
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.functions.{count, window}
-import org.apache.spark.sql.InternalOutputModes.Complete
+import org.apache.spark.sql.streaming.OutputMode._
 
 class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
 
@@ -117,7 +117,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin
     )
   }
 
-  test("append-mode watermark aggregation") {
+  test("append mode") {
     val inputData = MemoryStream[Int]
 
     val windowedAggregation = inputData.toDF()
@@ -129,11 +129,42 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin
 
     testStream(windowedAggregation)(
       AddData(inputData, 10, 11, 12, 13, 14, 15),
-      CheckAnswer(),
-      AddData(inputData, 25), // Advance watermark to 15 seconds
-      CheckAnswer(),
-      AddData(inputData, 25), // Evict items less than previous watermark.
-      CheckAnswer((10, 5))
+      CheckLastBatch(),
+      AddData(inputData, 25),   // Advance watermark to 15 seconds
+      CheckLastBatch(),
+      assertNumStateRows(3),
+      AddData(inputData, 25),   // Emit items less than watermark and drop their state
+      CheckLastBatch((10, 5)),
+      assertNumStateRows(2),
+      AddData(inputData, 10),   // Should not emit anything as data less than watermark
+      CheckLastBatch(),
+      assertNumStateRows(2)
+    )
+  }
+
+  test("update mode") {
+    val inputData = MemoryStream[Int]
+    spark.conf.set("spark.sql.shuffle.partitions", "10")
+
+    val windowedAggregation = inputData.toDF()
+      .withColumn("eventTime", $"value".cast("timestamp"))
+      .withWatermark("eventTime", "10 seconds")
+      .groupBy(window($"eventTime", "5 seconds") as 'window)
+      .agg(count("*") as 'count)
+      .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+    testStream(windowedAggregation, OutputMode.Update)(
+      AddData(inputData, 10, 11, 12, 13, 14, 15),
+      CheckLastBatch((10, 5), (15, 1)),
+      AddData(inputData, 25),     // Advance watermark to 15 seconds
+      CheckLastBatch((25, 1)),
+      assertNumStateRows(3),
+      AddData(inputData, 10, 25), // Ignore 10 as its less than watermark
+      CheckLastBatch((25, 2)),
+      assertNumStateRows(2),
+      AddData(inputData, 10),     // Should not emit anything as data less than watermark
+      CheckLastBatch(),
+      assertNumStateRows(2)
     )
   }
 
@@ -271,6 +302,12 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin
     )
   }
 
+  private def assertNumStateRows(numTotalRows: Long): AssertOnQuery = AssertOnQuery { q =>
+    val progressWithData = q.recentProgress.filter(_.numInputRows > 0).lastOption.get
+    assert(progressWithData.stateOperators(0).numRowsTotal === numTotalRows)
+    true
+  }
+
   private def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = {
     AssertOnQuery { q =>
       body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime)

http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 54efae3..22f59f6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.streaming
 
-import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.{AnalysisException, DataFrame}
 import org.apache.spark.sql.execution.DataSourceScanExec
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.streaming.{MemoryStream, MetadataLogFileIndex}
@@ -210,6 +210,26 @@ class FileStreamSinkSuite extends StreamTest {
     }
   }
 
+  test("Update and Complete output mode not supported") {
+    val df = MemoryStream[Int].toDF().groupBy().count()
+    val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
+
+    withTempDir { dir =>
+
+      def testOutputMode(mode: String): Unit = {
+        val e = intercept[AnalysisException] {
+          df.writeStream.format("parquet").outputMode(mode).start(dir.getCanonicalPath)
+        }
+        Seq(mode, "not support").foreach { w =>
+          assert(e.getMessage.toLowerCase.contains(w))
+        }
+      }
+
+      testOutputMode("update")
+      testOutputMode("complete")
+    }
+  }
+
   test("parquet") {
     testFormat(None) // should not throw error as default format parquet when not specified
     testFormat(Some("parquet"))

http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 2d218f4..55d927a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -899,7 +899,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
       // This is to avoid actually running a Spark job with 10000 tasks
       val df = files.filter("1 == 0").groupBy().count()
 
-      testStream(df, InternalOutputModes.Complete)(
+      testStream(df, OutputMode.Complete)(
         AddTextFileData("0", src, tmp),
         CheckAnswer(0)
       )

http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
deleted file mode 100644
index 4e9fba9..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import scala.language.implicitConversions
-
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
-import org.apache.spark.util.Utils
-
-class MemorySinkSuite extends StreamTest with BeforeAndAfter {
-
-  import testImplicits._
-
-  after {
-    sqlContext.streams.active.foreach(_.stop())
-  }
-
-  test("directly add data in Append output mode") {
-    implicit val schema = new StructType().add(new StructField("value", IntegerType))
-    val sink = new MemorySink(schema, InternalOutputModes.Append)
-
-    // Before adding data, check output
-    assert(sink.latestBatchId === None)
-    checkAnswer(sink.latestBatchData, Seq.empty)
-    checkAnswer(sink.allData, Seq.empty)
-
-    // Add batch 0 and check outputs
-    sink.addBatch(0, 1 to 3)
-    assert(sink.latestBatchId === Some(0))
-    checkAnswer(sink.latestBatchData, 1 to 3)
-    checkAnswer(sink.allData, 1 to 3)
-
-    // Add batch 1 and check outputs
-    sink.addBatch(1, 4 to 6)
-    assert(sink.latestBatchId === Some(1))
-    checkAnswer(sink.latestBatchData, 4 to 6)
-    checkAnswer(sink.allData, 1 to 6)     // new data should get appended to old data
-
-    // Re-add batch 1 with different data, should not be added and outputs should not be changed
-    sink.addBatch(1, 7 to 9)
-    assert(sink.latestBatchId === Some(1))
-    checkAnswer(sink.latestBatchData, 4 to 6)
-    checkAnswer(sink.allData, 1 to 6)
-
-    // Add batch 2 and check outputs
-    sink.addBatch(2, 7 to 9)
-    assert(sink.latestBatchId === Some(2))
-    checkAnswer(sink.latestBatchData, 7 to 9)
-    checkAnswer(sink.allData, 1 to 9)
-  }
-
-  test("directly add data in Update output mode") {
-    implicit val schema = new StructType().add(new StructField("value", IntegerType))
-    val sink = new MemorySink(schema, InternalOutputModes.Update)
-
-    // Before adding data, check output
-    assert(sink.latestBatchId === None)
-    checkAnswer(sink.latestBatchData, Seq.empty)
-    checkAnswer(sink.allData, Seq.empty)
-
-    // Add batch 0 and check outputs
-    sink.addBatch(0, 1 to 3)
-    assert(sink.latestBatchId === Some(0))
-    checkAnswer(sink.latestBatchData, 1 to 3)
-    checkAnswer(sink.allData, 1 to 3)
-
-    // Add batch 1 and check outputs
-    sink.addBatch(1, 4 to 6)
-    assert(sink.latestBatchId === Some(1))
-    checkAnswer(sink.latestBatchData, 4 to 6)
-    checkAnswer(sink.allData, 1 to 6) // new data should get appended to old data
-
-    // Re-add batch 1 with different data, should not be added and outputs should not be changed
-    sink.addBatch(1, 7 to 9)
-    assert(sink.latestBatchId === Some(1))
-    checkAnswer(sink.latestBatchData, 4 to 6)
-    checkAnswer(sink.allData, 1 to 6)
-
-    // Add batch 2 and check outputs
-    sink.addBatch(2, 7 to 9)
-    assert(sink.latestBatchId === Some(2))
-    checkAnswer(sink.latestBatchData, 7 to 9)
-    checkAnswer(sink.allData, 1 to 9)
-  }
-
-  test("directly add data in Complete output mode") {
-    implicit val schema = new StructType().add(new StructField("value", IntegerType))
-    val sink = new MemorySink(schema, InternalOutputModes.Complete)
-
-    // Before adding data, check output
-    assert(sink.latestBatchId === None)
-    checkAnswer(sink.latestBatchData, Seq.empty)
-    checkAnswer(sink.allData, Seq.empty)
-
-    // Add batch 0 and check outputs
-    sink.addBatch(0, 1 to 3)
-    assert(sink.latestBatchId === Some(0))
-    checkAnswer(sink.latestBatchData, 1 to 3)
-    checkAnswer(sink.allData, 1 to 3)
-
-    // Add batch 1 and check outputs
-    sink.addBatch(1, 4 to 6)
-    assert(sink.latestBatchId === Some(1))
-    checkAnswer(sink.latestBatchData, 4 to 6)
-    checkAnswer(sink.allData, 4 to 6)     // new data should replace old data
-
-    // Re-add batch 1 with different data, should not be added and outputs should not be changed
-    sink.addBatch(1, 7 to 9)
-    assert(sink.latestBatchId === Some(1))
-    checkAnswer(sink.latestBatchData, 4 to 6)
-    checkAnswer(sink.allData, 4 to 6)
-
-    // Add batch 2 and check outputs
-    sink.addBatch(2, 7 to 9)
-    assert(sink.latestBatchId === Some(2))
-    checkAnswer(sink.latestBatchData, 7 to 9)
-    checkAnswer(sink.allData, 7 to 9)
-  }
-
-
-  test("registering as a table in Append output mode") {
-    val input = MemoryStream[Int]
-    val query = input.toDF().writeStream
-      .format("memory")
-      .outputMode("append")
-      .queryName("memStream")
-      .start()
-    input.addData(1, 2, 3)
-    query.processAllAvailable()
-
-    checkDataset(
-      spark.table("memStream").as[Int],
-      1, 2, 3)
-
-    input.addData(4, 5, 6)
-    query.processAllAvailable()
-    checkDataset(
-      spark.table("memStream").as[Int],
-      1, 2, 3, 4, 5, 6)
-
-    query.stop()
-  }
-
-  test("registering as a table in Complete output mode") {
-    val input = MemoryStream[Int]
-    val query = input.toDF()
-      .groupBy("value")
-      .count()
-      .writeStream
-      .format("memory")
-      .outputMode("complete")
-      .queryName("memStream")
-      .start()
-    input.addData(1, 2, 3)
-    query.processAllAvailable()
-
-    checkDatasetUnorderly(
-      spark.table("memStream").as[(Int, Long)],
-      (1, 1L), (2, 1L), (3, 1L))
-
-    input.addData(4, 5, 6)
-    query.processAllAvailable()
-    checkDatasetUnorderly(
-      spark.table("memStream").as[(Int, Long)],
-      (1, 1L), (2, 1L), (3, 1L), (4, 1L), (5, 1L), (6, 1L))
-
-    query.stop()
-  }
-
-  test("MemoryPlan statistics") {
-    implicit val schema = new StructType().add(new StructField("value", IntegerType))
-    val sink = new MemorySink(schema, InternalOutputModes.Append)
-    val plan = new MemoryPlan(sink)
-
-    // Before adding data, check output
-    checkAnswer(sink.allData, Seq.empty)
-    assert(plan.statistics.sizeInBytes === 0)
-
-    sink.addBatch(0, 1 to 3)
-    assert(plan.statistics.sizeInBytes === 12)
-
-    sink.addBatch(1, 4 to 6)
-    assert(plan.statistics.sizeInBytes === 24)
-  }
-
-  ignore("stress test") {
-    // Ignore the stress test as it takes several minutes to run
-    (0 until 1000).foreach { _ =>
-      val input = MemoryStream[Int]
-      val query = input.toDF().writeStream
-        .format("memory")
-        .queryName("memStream")
-        .start()
-      input.addData(1, 2, 3)
-      query.processAllAvailable()
-
-      checkDataset(
-        spark.table("memStream").as[Int],
-        1, 2, 3)
-
-      input.addData(4, 5, 6)
-      query.processAllAvailable()
-      checkDataset(
-        spark.table("memStream").as[Int],
-        1, 2, 3, 4, 5, 6)
-
-      query.stop()
-    }
-  }
-
-  test("error when no name is specified") {
-    val error = intercept[AnalysisException] {
-      val input = MemoryStream[Int]
-      val query = input.toDF().writeStream
-          .format("memory")
-          .start()
-    }
-
-    assert(error.message contains "queryName must be specified")
-  }
-
-  test("error if attempting to resume specific checkpoint") {
-    val location = Utils.createTempDir(namePrefix = "steaming.checkpoint").getCanonicalPath
-
-    val input = MemoryStream[Int]
-    val query = input.toDF().writeStream
-        .format("memory")
-        .queryName("memStream")
-        .option("checkpointLocation", location)
-        .start()
-    input.addData(1, 2, 3)
-    query.processAllAvailable()
-    query.stop()
-
-    intercept[AnalysisException] {
-      input.toDF().writeStream
-        .format("memory")
-        .queryName("memStream")
-        .option("checkpointLocation", location)
-        .start()
-    }
-  }
-
-  private def checkAnswer(rows: Seq[Row], expected: Seq[Int])(implicit schema: StructType): Unit = {
-    checkAnswer(
-      sqlContext.createDataFrame(sparkContext.makeRDD(rows), schema),
-      intsToDF(expected)(schema))
-  }
-
-  private implicit def intsToDF(seq: Seq[Int])(implicit schema: StructType): DataFrame = {
-    require(schema.fields.size === 1)
-    sqlContext.createDataset(seq).toDF(schema.fieldNames.head)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 4a64054..b8fa82d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -21,6 +21,7 @@ import scala.reflect.ClassTag
 import scala.util.control.ControlThrowable
 
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.sources.StreamSourceProvider
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
@@ -265,10 +266,9 @@ class StreamSuite extends StreamTest {
   }
 
   test("output mode API in Scala") {
-    val o1 = OutputMode.Append
-    assert(o1 === InternalOutputModes.Append)
-    val o2 = OutputMode.Complete
-    assert(o2 === InternalOutputModes.Complete)
+    assert(OutputMode.Append === InternalOutputModes.Append)
+    assert(OutputMode.Complete === InternalOutputModes.Complete)
+    assert(OutputMode.Update === InternalOutputModes.Update)
   }
 
   test("explain") {

http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index fbe560e..eca2647 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -23,13 +23,13 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.InternalOutputModes._
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.state.StateStore
 import org.apache.spark.sql.expressions.scalalang.typed
 import org.apache.spark.sql.functions._
+import org.apache.spark.sql.streaming.OutputMode._
 
 object FailureSinglton {
   var firstTime = true

http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 9de3da3..097dd6e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -23,13 +23,14 @@ import java.util.concurrent.TimeUnit
 import scala.concurrent.duration._
 
 import org.mockito.Mockito._
-import org.scalatest.BeforeAndAfter
+import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
+import org.scalatest.PrivateMethodTester.PrivateMethod
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
-import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, StreamingQuery, StreamTest}
+import org.apache.spark.sql.streaming._
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
@@ -105,7 +106,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
   }
 }
 
-class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
+class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter with PrivateMethodTester {
 
   private def newMetadataDir =
     Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
@@ -388,19 +389,40 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
 
   private def newTextInput = Utils.createTempDir(namePrefix = "text").getCanonicalPath
 
-  test("check outputMode(string) throws exception on unsupported modes") {
-    def testError(outputMode: String): Unit = {
+  test("supported strings in outputMode(string)") {
+    val outputModeMethod = PrivateMethod[OutputMode]('outputMode)
+
+    def testMode(outputMode: String, expected: OutputMode): Unit = {
+      val df = spark.readStream
+        .format("org.apache.spark.sql.streaming.test")
+        .load()
+      val w = df.writeStream
+      w.outputMode(outputMode)
+      val setOutputMode = w invokePrivate outputModeMethod()
+      assert(setOutputMode === expected)
+    }
+
+    testMode("append", OutputMode.Append)
+    testMode("Append", OutputMode.Append)
+    testMode("complete", OutputMode.Complete)
+    testMode("Complete", OutputMode.Complete)
+    testMode("update", OutputMode.Update)
+    testMode("Update", OutputMode.Update)
+  }
+
+  test("unsupported strings in outputMode(string)") {
+    def testMode(outputMode: String): Unit = {
+      val acceptedModes = Seq("append", "update", "complete")
       val df = spark.readStream
         .format("org.apache.spark.sql.streaming.test")
         .load()
       val w = df.writeStream
       val e = intercept[IllegalArgumentException](w.outputMode(outputMode))
-      Seq("output mode", "unknown", outputMode).foreach { s =>
+      (Seq("output mode", "unknown", outputMode) ++ acceptedModes).foreach { s =>
         assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
       }
     }
-    testError("Update")
-    testError("Xyz")
+    testMode("Xyz")
   }
 
   test("check foreach() catches null writers") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message