spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [1/2] spark git commit: [SPARK-15933][SQL][STREAMING] Refactored DF reader-writer to use readStream and writeStream for streaming DFs
Date Wed, 15 Jun 2016 00:58:49 GMT
Repository: spark
Updated Branches:
  refs/heads/master 5d50d4f0f -> 214adb14b


http://git-wip-us.apache.org/repos/asf/spark/blob/214adb14/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
new file mode 100644
index 0000000..b035ff7
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, ForeachWriter}
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, MemorySink}
+
+/**
+ * :: Experimental ::
+ * Interface used to write a streaming [[Dataset]] to external storage systems (e.g. file systems,
+ * key-value stores, etc). Use [[Dataset.writeStream]] to access this.
+ *
+ * @since 2.0.0
+ */
+@Experimental
+final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
+
+  private val df = ds.toDF()
+
+  /**
+   * :: Experimental ::
+   * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
+   *   - `OutputMode.Append()`: only the new rows in the streaming DataFrame/Dataset will be
+   *                            written to the sink
+   *   - `OutputMode.Complete()`: all the rows in the streaming DataFrame/Dataset will be written
+   *                              to the sink every time these is some updates
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def outputMode(outputMode: OutputMode): DataStreamWriter[T] = {
+    this.outputMode = outputMode
+    this
+  }
+
+
+  /**
+   * :: Experimental ::
+   * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
+   *   - `append`:   only the new rows in the streaming DataFrame/Dataset will be written to
+   *                 the sink
+   *   - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink
+   *                 every time these is some updates
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def outputMode(outputMode: String): DataStreamWriter[T] = {
+    this.outputMode = outputMode.toLowerCase match {
+      case "append" =>
+        OutputMode.Append
+      case "complete" =>
+        OutputMode.Complete
+      case _ =>
+        throw new IllegalArgumentException(s"Unknown output mode $outputMode. " +
+          "Accepted output modes are 'append' and 'complete'")
+    }
+    this
+  }
+
+  /**
+   * :: Experimental ::
+   * Set the trigger for the stream query. The default value is `ProcessingTime(0)` and it will run
+   * the query as fast as possible.
+   *
+   * Scala Example:
+   * {{{
+   *   df.writeStream.trigger(ProcessingTime("10 seconds"))
+   *
+   *   import scala.concurrent.duration._
+   *   df.writeStream.trigger(ProcessingTime(10.seconds))
+   * }}}
+   *
+   * Java Example:
+   * {{{
+   *   df.writeStream().trigger(ProcessingTime.create("10 seconds"))
+   *
+   *   import java.util.concurrent.TimeUnit
+   *   df.writeStream().trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+   * }}}
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def trigger(trigger: Trigger): DataStreamWriter[T] = {
+    this.trigger = trigger
+    this
+  }
+
+
+  /**
+   * :: Experimental ::
+   * Specifies the name of the [[ContinuousQuery]] that can be started with `startStream()`.
+   * This name must be unique among all the currently active queries in the associated SQLContext.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def queryName(queryName: String): DataStreamWriter[T] = {
+    this.extraOptions += ("queryName" -> queryName)
+    this
+  }
+
+  /**
+   * :: Experimental ::
+   * Specifies the underlying output data source. Built-in options include "parquet", "json", etc.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def format(source: String): DataStreamWriter[T] = {
+    this.source = source
+    this
+  }
+
+  /**
+   * Partitions the output by the given columns on the file system. If specified, the output is
+   * laid out on the file system similar to Hive's partitioning scheme. As an example, when we
+   * partition a dataset by year and then month, the directory layout would look like:
+   *
+   *   - year=2016/month=01/
+   *   - year=2016/month=02/
+   *
+   * Partitioning is one of the most widely used techniques to optimize physical data layout.
+   * It provides a coarse-grained index for skipping unnecessary data reads when queries have
+   * predicates on the partitioned columns. In order for partitioning to work well, the number
+   * of distinct values in each column should typically be less than tens of thousands.
+   *
+   * This was initially applicable for Parquet but in 1.5+ covers JSON, text, ORC and avro as well.
+   *
+   * @since 1.4.0
+   */
+  @scala.annotation.varargs
+  def partitionBy(colNames: String*): DataStreamWriter[T] = {
+    this.partitioningColumns = Option(colNames)
+    this
+  }
+
+  /**
+   * :: Experimental ::
+   * Adds an output option for the underlying data source.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def option(key: String, value: String): DataStreamWriter[T] = {
+    this.extraOptions += (key -> value)
+    this
+  }
+
+  /**
+   * :: Experimental ::
+   * Adds an output option for the underlying data source.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def option(key: String, value: Boolean): DataStreamWriter[T] = option(key, value.toString)
+
+  /**
+   * :: Experimental ::
+   * Adds an output option for the underlying data source.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def option(key: String, value: Long): DataStreamWriter[T] = option(key, value.toString)
+
+  /**
+   * :: Experimental ::
+   * Adds an output option for the underlying data source.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def option(key: String, value: Double): DataStreamWriter[T] = option(key, value.toString)
+
+  /**
+   * :: Experimental ::
+   * (Scala-specific) Adds output options for the underlying data source.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def options(options: scala.collection.Map[String, String]): DataStreamWriter[T] = {
+    this.extraOptions ++= options
+    this
+  }
+
+  /**
+   * :: Experimental ::
+   * Adds output options for the underlying data source.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def options(options: java.util.Map[String, String]): DataStreamWriter[T] = {
+    this.options(options.asScala)
+    this
+  }
+
+  /**
+   * :: Experimental ::
+   * Starts the execution of the streaming query, which will continually output results to the given
+   * path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with
+   * the stream.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def start(path: String): ContinuousQuery = {
+    option("path", path).start()
+  }
+
+  /**
+   * :: Experimental ::
+   * Starts the execution of the streaming query, which will continually output results to the given
+   * path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with
+   * the stream.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def start(): ContinuousQuery = {
+    if (source == "memory") {
+      assertNotPartitioned("memory")
+      if (extraOptions.get("queryName").isEmpty) {
+        throw new AnalysisException("queryName must be specified for memory sink")
+      }
+
+      val sink = new MemorySink(df.schema, outputMode)
+      val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink))
+      val query = df.sparkSession.sessionState.continuousQueryManager.startQuery(
+        extraOptions.get("queryName"),
+        extraOptions.get("checkpointLocation"),
+        df,
+        sink,
+        outputMode,
+        useTempCheckpointLocation = true,
+        recoverFromCheckpointLocation = false,
+        trigger = trigger)
+      resultDf.createOrReplaceTempView(query.name)
+      query
+    } else if (source == "foreach") {
+      assertNotPartitioned("foreach")
+      val sink = new ForeachSink[T](foreachWriter)(ds.exprEnc)
+      df.sparkSession.sessionState.continuousQueryManager.startQuery(
+        extraOptions.get("queryName"),
+        extraOptions.get("checkpointLocation"),
+        df,
+        sink,
+        outputMode,
+        useTempCheckpointLocation = true,
+        trigger = trigger)
+    } else {
+      val dataSource =
+        DataSource(
+          df.sparkSession,
+          className = source,
+          options = extraOptions.toMap,
+          partitionColumns = normalizedParCols.getOrElse(Nil))
+      df.sparkSession.sessionState.continuousQueryManager.startQuery(
+        extraOptions.get("queryName"),
+        extraOptions.get("checkpointLocation"),
+        df,
+        dataSource.createSink(outputMode),
+        outputMode,
+        trigger = trigger)
+    }
+  }
+
+  /**
+   * :: Experimental ::
+   * Starts the execution of the streaming query, which will continually send results to the given
+   * [[ForeachWriter]] as as new data arrives. The [[ForeachWriter]] can be used to send the data
+   * generated by the [[DataFrame]]/[[Dataset]] to an external system.
+   *
+   * Scala example:
+   * {{{
+   *   datasetOfString.writeStream.foreach(new ForeachWriter[String] {
+   *
+   *     def open(partitionId: Long, version: Long): Boolean = {
+   *       // open connection
+   *     }
+   *
+   *     def process(record: String) = {
+   *       // write string to connection
+   *     }
+   *
+   *     def close(errorOrNull: Throwable): Unit = {
+   *       // close the connection
+   *     }
+   *   }).start()
+   * }}}
+   *
+   * Java example:
+   * {{{
+   *  datasetOfString.writeStream().foreach(new ForeachWriter<String>() {
+   *
+   *    @Override
+   *    public boolean open(long partitionId, long version) {
+   *      // open connection
+   *    }
+   *
+   *    @Override
+   *    public void process(String value) {
+   *      // write string to connection
+   *    }
+   *
+   *    @Override
+   *    public void close(Throwable errorOrNull) {
+   *      // close the connection
+   *    }
+   *  }).start();
+   * }}}
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def foreach(writer: ForeachWriter[T]): DataStreamWriter[T] = {
+    this.source = "foreach"
+    this.foreachWriter = if (writer != null) {
+      ds.sparkSession.sparkContext.clean(writer)
+    } else {
+      throw new IllegalArgumentException("foreach writer cannot be null")
+    }
+    this
+  }
+
+  private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { cols =>
+    cols.map(normalize(_, "Partition"))
+  }
+
+  /**
+   * The given column name may not be equal to any of the existing column names if we were in
+   * case-insensitive context. Normalize the given column name to the real one so that we don't
+   * need to care about case sensitivity afterwards.
+   */
+  private def normalize(columnName: String, columnType: String): String = {
+    val validColumnNames = df.logicalPlan.output.map(_.name)
+    validColumnNames.find(df.sparkSession.sessionState.analyzer.resolver(_, columnName))
+      .getOrElse(throw new AnalysisException(s"$columnType column $columnName not found in " +
+        s"existing columns (${validColumnNames.mkString(", ")})"))
+  }
+
+  private def assertNotPartitioned(operation: String): Unit = {
+    if (partitioningColumns.isDefined) {
+      throw new AnalysisException(s"'$operation' does not support partitioning")
+    }
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////
+  // Builder pattern config options
+  ///////////////////////////////////////////////////////////////////////////////////////
+
+  private var source: String = df.sparkSession.sessionState.conf.defaultDataSourceName
+
+  private var outputMode: OutputMode = OutputMode.Append
+
+  private var trigger: Trigger = ProcessingTime(0L)
+
+  private var extraOptions = new scala.collection.mutable.HashMap[String, String]
+
+  private var foreachWriter: ForeachWriter[T] = null
+
+  private var partitioningColumns: Option[Seq[String]] = None
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/214adb14/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
index e1fb3b9..6ff597c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
@@ -38,9 +38,10 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf
   test("foreach") {
     withTempDir { checkpointDir =>
       val input = MemoryStream[Int]
-      val query = input.toDS().repartition(2).write
+      val query = input.toDS().repartition(2).writeStream
         .option("checkpointLocation", checkpointDir.getCanonicalPath)
         .foreach(new TestForeachWriter())
+        .start()
       input.addData(1, 2, 3, 4)
       query.processAllAvailable()
 
@@ -70,14 +71,14 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf
   test("foreach with error") {
     withTempDir { checkpointDir =>
       val input = MemoryStream[Int]
-      val query = input.toDS().repartition(1).write
+      val query = input.toDS().repartition(1).writeStream
         .option("checkpointLocation", checkpointDir.getCanonicalPath)
         .foreach(new TestForeachWriter() {
           override def process(value: Int): Unit = {
             super.process(value)
             throw new RuntimeException("error")
           }
-        })
+        }).start()
       input.addData(1, 2, 3, 4)
       query.processAllAvailable()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/214adb14/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
index f81608b..ef2fcbf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
@@ -225,12 +225,12 @@ class ContinuousQueryManagerSuite extends StreamTest with BeforeAndAfter {
             val metadataRoot =
               Utils.createTempDir(namePrefix = "streaming.checkpoint").getCanonicalPath
             query =
-              df.write
+              df.writeStream
                 .format("memory")
                 .queryName(s"query$i")
                 .option("checkpointLocation", metadataRoot)
                 .outputMode("append")
-                .startStream()
+                .start()
                 .asInstanceOf[StreamExecution]
           } catch {
             case NonFatal(e) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/214adb14/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
index 43a8857..ad6bc27 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
@@ -39,12 +39,12 @@ class ContinuousQuerySuite extends StreamTest with BeforeAndAfter {
 
     def startQuery(queryName: String): ContinuousQuery = {
       val metadataRoot = Utils.createTempDir(namePrefix = "streaming.checkpoint").getCanonicalPath
-      val writer = mapped.write
+      val writer = mapped.writeStream
       writer
         .queryName(queryName)
         .format("memory")
         .option("checkpointLocation", metadataRoot)
-        .startStream()
+        .start()
     }
 
     val q1 = startQuery("q1")

http://git-wip-us.apache.org/repos/asf/spark/blob/214adb14/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index bb3063d..a5acc97 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -128,10 +128,10 @@ class FileStreamSinkSuite extends StreamTest {
 
     try {
       query =
-        df.write
-          .format("parquet")
+        df.writeStream
           .option("checkpointLocation", checkpointDir)
-          .startStream(outputDir)
+          .format("parquet")
+          .start(outputDir)
 
       inputData.addData(1, 2, 3)
 
@@ -162,11 +162,11 @@ class FileStreamSinkSuite extends StreamTest {
       query =
         ds.map(i => (i, i * 1000))
           .toDF("id", "value")
-          .write
-          .format("parquet")
+          .writeStream
           .partitionBy("id")
           .option("checkpointLocation", checkpointDir)
-          .startStream(outputDir)
+          .format("parquet")
+          .start(outputDir)
 
       inputData.addData(1, 2, 3)
       failAfter(streamingTimeout) {
@@ -246,13 +246,13 @@ class FileStreamSinkSuite extends StreamTest {
         val writer =
           ds.map(i => (i, i * 1000))
             .toDF("id", "value")
-            .write
+            .writeStream
         if (format.nonEmpty) {
           writer.format(format.get)
         }
         query = writer
             .option("checkpointLocation", checkpointDir)
-            .startStream(outputDir)
+            .start(outputDir)
       } finally {
         if (query != null) {
           query.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/214adb14/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index f681b88..6971f93 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -107,11 +107,11 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
       schema: Option[StructType] = None): DataFrame = {
     val reader =
       if (schema.isDefined) {
-        spark.read.format(format).schema(schema.get)
+        spark.readStream.format(format).schema(schema.get)
       } else {
-        spark.read.format(format)
+        spark.readStream.format(format)
       }
-    reader.stream(path)
+    reader.load(path)
   }
 
   protected def getSourceFromFileStream(df: DataFrame): FileStreamSource = {
@@ -153,14 +153,14 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
       format: Option[String],
       path: Option[String],
       schema: Option[StructType] = None): StructType = {
-    val reader = spark.read
+    val reader = spark.readStream
     format.foreach(reader.format)
     schema.foreach(reader.schema)
     val df =
       if (path.isDefined) {
-        reader.stream(path.get)
+        reader.load(path.get)
       } else {
-        reader.stream()
+        reader.load()
       }
     df.queryExecution.analyzed
       .collect { case s @ StreamingRelation(dataSource, _, _) => s.schema }.head

http://git-wip-us.apache.org/repos/asf/spark/blob/214adb14/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
index 1c0fb34..0e157cf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
@@ -98,7 +98,7 @@ class FileStressSuite extends StreamTest {
     }
     writer.start()
 
-    val input = spark.read.format("text").stream(inputDir)
+    val input = spark.readStream.format("text").load(inputDir)
 
     def startStream(): ContinuousQuery = {
       val output = input
@@ -116,17 +116,17 @@ class FileStressSuite extends StreamTest {
 
       if (partitionWrites) {
         output
-          .write
+          .writeStream
           .partitionBy("id")
           .format("parquet")
           .option("checkpointLocation", checkpoint)
-          .startStream(outputDir)
+          .start(outputDir)
       } else {
         output
-          .write
+          .writeStream
           .format("parquet")
           .option("checkpointLocation", checkpoint)
-          .startStream(outputDir)
+          .start(outputDir)
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/214adb14/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
index 9aada0b..310d756 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
@@ -140,11 +140,11 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
 
   test("registering as a table in Append output mode") {
     val input = MemoryStream[Int]
-    val query = input.toDF().write
+    val query = input.toDF().writeStream
       .format("memory")
       .outputMode("append")
       .queryName("memStream")
-      .startStream()
+      .start()
     input.addData(1, 2, 3)
     query.processAllAvailable()
 
@@ -166,11 +166,11 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
     val query = input.toDF()
       .groupBy("value")
       .count()
-      .write
+      .writeStream
       .format("memory")
       .outputMode("complete")
       .queryName("memStream")
-      .startStream()
+      .start()
     input.addData(1, 2, 3)
     query.processAllAvailable()
 
@@ -191,10 +191,10 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
     // Ignore the stress test as it takes several minutes to run
     (0 until 1000).foreach { _ =>
       val input = MemoryStream[Int]
-      val query = input.toDF().write
+      val query = input.toDF().writeStream
         .format("memory")
         .queryName("memStream")
-        .startStream()
+        .start()
       input.addData(1, 2, 3)
       query.processAllAvailable()
 
@@ -215,9 +215,9 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
   test("error when no name is specified") {
     val error = intercept[AnalysisException] {
       val input = MemoryStream[Int]
-      val query = input.toDF().write
+      val query = input.toDF().writeStream
           .format("memory")
-          .startStream()
+          .start()
     }
 
     assert(error.message contains "queryName must be specified")
@@ -227,21 +227,21 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
     val location = Utils.createTempDir(namePrefix = "steaming.checkpoint").getCanonicalPath
 
     val input = MemoryStream[Int]
-    val query = input.toDF().write
+    val query = input.toDF().writeStream
         .format("memory")
         .queryName("memStream")
         .option("checkpointLocation", location)
-        .startStream()
+        .start()
     input.addData(1, 2, 3)
     query.processAllAvailable()
     query.stop()
 
     intercept[AnalysisException] {
-      input.toDF().write
+      input.toDF().writeStream
         .format("memory")
         .queryName("memStream")
         .option("checkpointLocation", location)
-        .startStream()
+        .start()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/214adb14/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 9414b1c..786404a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -89,9 +89,9 @@ class StreamSuite extends StreamTest {
     def assertDF(df: DataFrame) {
       withTempDir { outputDir =>
         withTempDir { checkpointDir =>
-          val query = df.write.format("parquet")
+          val query = df.writeStream.format("parquet")
             .option("checkpointLocation", checkpointDir.getAbsolutePath)
-            .startStream(outputDir.getAbsolutePath)
+            .start(outputDir.getAbsolutePath)
           try {
             query.processAllAvailable()
             val outputDf = spark.read.parquet(outputDir.getAbsolutePath).as[Long]
@@ -103,7 +103,7 @@ class StreamSuite extends StreamTest {
       }
     }
 
-    val df = spark.read.format(classOf[FakeDefaultSource].getName).stream()
+    val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load()
     assertDF(df)
     assertDF(df)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/214adb14/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index 8681199..7f44227 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -40,6 +40,8 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll {
 
   import testImplicits._
 
+
+
   test("simple count, update mode") {
     val inputData = MemoryStream[Int]
 

http://git-wip-us.apache.org/repos/asf/spark/blob/214adb14/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
deleted file mode 100644
index 6e0d66a..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
+++ /dev/null
@@ -1,621 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming.test
-
-import java.util.concurrent.TimeUnit
-
-import scala.concurrent.duration._
-
-import org.mockito.Mockito._
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
-import org.apache.spark.sql.streaming.{ContinuousQuery, OutputMode, ProcessingTime, StreamTest}
-import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
-import org.apache.spark.util.Utils
-
-object LastOptions {
-
-  var mockStreamSourceProvider = mock(classOf[StreamSourceProvider])
-  var mockStreamSinkProvider = mock(classOf[StreamSinkProvider])
-  var parameters: Map[String, String] = null
-  var schema: Option[StructType] = null
-  var partitionColumns: Seq[String] = Nil
-
-  def clear(): Unit = {
-    parameters = null
-    schema = null
-    partitionColumns = null
-    reset(mockStreamSourceProvider)
-    reset(mockStreamSinkProvider)
-  }
-}
-
-/** Dummy provider: returns no-op source/sink and records options in [[LastOptions]]. */
-class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
-
-  private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil)
-
-  override def sourceSchema(
-      spark: SQLContext,
-      schema: Option[StructType],
-      providerName: String,
-      parameters: Map[String, String]): (String, StructType) = {
-    LastOptions.parameters = parameters
-    LastOptions.schema = schema
-    LastOptions.mockStreamSourceProvider.sourceSchema(spark, schema, providerName, parameters)
-    ("dummySource", fakeSchema)
-  }
-
-  override def createSource(
-      spark: SQLContext,
-      metadataPath: String,
-      schema: Option[StructType],
-      providerName: String,
-      parameters: Map[String, String]): Source = {
-    LastOptions.parameters = parameters
-    LastOptions.schema = schema
-    LastOptions.mockStreamSourceProvider.createSource(
-      spark, metadataPath, schema, providerName, parameters)
-    new Source {
-      override def schema: StructType = fakeSchema
-
-      override def getOffset: Option[Offset] = Some(new LongOffset(0))
-
-      override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
-        import spark.implicits._
-
-        Seq[Int]().toDS().toDF()
-      }
-    }
-  }
-
-  override def createSink(
-      spark: SQLContext,
-      parameters: Map[String, String],
-      partitionColumns: Seq[String],
-      outputMode: OutputMode): Sink = {
-    LastOptions.parameters = parameters
-    LastOptions.partitionColumns = partitionColumns
-    LastOptions.mockStreamSinkProvider.createSink(spark, parameters, partitionColumns, outputMode)
-    new Sink {
-      override def addBatch(batchId: Long, data: DataFrame): Unit = {}
-    }
-  }
-}
-
-class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
-
-  private def newMetadataDir =
-    Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
-
-  after {
-    spark.streams.active.foreach(_.stop())
-  }
-
-  test("resolve default source") {
-    spark.read
-      .format("org.apache.spark.sql.streaming.test")
-      .stream()
-      .write
-      .format("org.apache.spark.sql.streaming.test")
-      .option("checkpointLocation", newMetadataDir)
-      .startStream()
-      .stop()
-  }
-
-  test("resolve full class") {
-    spark.read
-      .format("org.apache.spark.sql.streaming.test.DefaultSource")
-      .stream()
-      .write
-      .format("org.apache.spark.sql.streaming.test")
-      .option("checkpointLocation", newMetadataDir)
-      .startStream()
-      .stop()
-  }
-
-  test("options") {
-    val map = new java.util.HashMap[String, String]
-    map.put("opt3", "3")
-
-    val df = spark.read
-        .format("org.apache.spark.sql.streaming.test")
-        .option("opt1", "1")
-        .options(Map("opt2" -> "2"))
-        .options(map)
-        .stream()
-
-    assert(LastOptions.parameters("opt1") == "1")
-    assert(LastOptions.parameters("opt2") == "2")
-    assert(LastOptions.parameters("opt3") == "3")
-
-    LastOptions.clear()
-
-    df.write
-      .format("org.apache.spark.sql.streaming.test")
-      .option("opt1", "1")
-      .options(Map("opt2" -> "2"))
-      .options(map)
-      .option("checkpointLocation", newMetadataDir)
-      .startStream()
-      .stop()
-
-    assert(LastOptions.parameters("opt1") == "1")
-    assert(LastOptions.parameters("opt2") == "2")
-    assert(LastOptions.parameters("opt3") == "3")
-  }
-
-  test("partitioning") {
-    val df = spark.read
-      .format("org.apache.spark.sql.streaming.test")
-      .stream()
-
-    df.write
-      .format("org.apache.spark.sql.streaming.test")
-      .option("checkpointLocation", newMetadataDir)
-      .startStream()
-      .stop()
-    assert(LastOptions.partitionColumns == Nil)
-
-    df.write
-      .format("org.apache.spark.sql.streaming.test")
-      .option("checkpointLocation", newMetadataDir)
-      .partitionBy("a")
-      .startStream()
-      .stop()
-    assert(LastOptions.partitionColumns == Seq("a"))
-
-    withSQLConf("spark.sql.caseSensitive" -> "false") {
-      df.write
-        .format("org.apache.spark.sql.streaming.test")
-        .option("checkpointLocation", newMetadataDir)
-        .partitionBy("A")
-        .startStream()
-        .stop()
-      assert(LastOptions.partitionColumns == Seq("a"))
-    }
-
-    intercept[AnalysisException] {
-      df.write
-        .format("org.apache.spark.sql.streaming.test")
-        .option("checkpointLocation", newMetadataDir)
-        .partitionBy("b")
-        .startStream()
-        .stop()
-    }
-  }
-
-  test("stream paths") {
-    val df = spark.read
-      .format("org.apache.spark.sql.streaming.test")
-      .option("checkpointLocation", newMetadataDir)
-      .stream("/test")
-
-    assert(LastOptions.parameters("path") == "/test")
-
-    LastOptions.clear()
-
-    df.write
-      .format("org.apache.spark.sql.streaming.test")
-      .option("checkpointLocation", newMetadataDir)
-      .startStream("/test")
-      .stop()
-
-    assert(LastOptions.parameters("path") == "/test")
-  }
-
-  test("test different data types for options") {
-    val df = spark.read
-      .format("org.apache.spark.sql.streaming.test")
-      .option("intOpt", 56)
-      .option("boolOpt", false)
-      .option("doubleOpt", 6.7)
-      .stream("/test")
-
-    assert(LastOptions.parameters("intOpt") == "56")
-    assert(LastOptions.parameters("boolOpt") == "false")
-    assert(LastOptions.parameters("doubleOpt") == "6.7")
-
-    LastOptions.clear()
-    df.write
-      .format("org.apache.spark.sql.streaming.test")
-      .option("intOpt", 56)
-      .option("boolOpt", false)
-      .option("doubleOpt", 6.7)
-      .option("checkpointLocation", newMetadataDir)
-      .startStream("/test")
-      .stop()
-
-    assert(LastOptions.parameters("intOpt") == "56")
-    assert(LastOptions.parameters("boolOpt") == "false")
-    assert(LastOptions.parameters("doubleOpt") == "6.7")
-  }
-
-  test("unique query names") {
-
-    /** Start a query with a specific name */
-    def startQueryWithName(name: String = ""): ContinuousQuery = {
-      spark.read
-        .format("org.apache.spark.sql.streaming.test")
-        .stream("/test")
-        .write
-        .format("org.apache.spark.sql.streaming.test")
-        .option("checkpointLocation", newMetadataDir)
-        .queryName(name)
-        .startStream()
-    }
-
-    /** Start a query without specifying a name */
-    def startQueryWithoutName(): ContinuousQuery = {
-      spark.read
-        .format("org.apache.spark.sql.streaming.test")
-        .stream("/test")
-        .write
-        .format("org.apache.spark.sql.streaming.test")
-        .option("checkpointLocation", newMetadataDir)
-        .startStream()
-    }
-
-    /** Get the names of active streams */
-    def activeStreamNames: Set[String] = {
-      val streams = spark.streams.active
-      val names = streams.map(_.name).toSet
-      assert(streams.length === names.size, s"names of active queries are not unique: $names")
-      names
-    }
-
-    val q1 = startQueryWithName("name")
-
-    // Should not be able to start another query with the same name
-    intercept[IllegalArgumentException] {
-      startQueryWithName("name")
-    }
-    assert(activeStreamNames === Set("name"))
-
-    // Should be able to start queries with other names
-    val q3 = startQueryWithName("another-name")
-    assert(activeStreamNames === Set("name", "another-name"))
-
-    // Should be able to start queries with auto-generated names
-    val q4 = startQueryWithoutName()
-    assert(activeStreamNames.contains(q4.name))
-
-    // Should not be able to start a query with same auto-generated name
-    intercept[IllegalArgumentException] {
-      startQueryWithName(q4.name)
-    }
-
-    // Should be able to start query with that name after stopping the previous query
-    q1.stop()
-    val q5 = startQueryWithName("name")
-    assert(activeStreamNames.contains("name"))
-    spark.streams.active.foreach(_.stop())
-  }
-
-  test("trigger") {
-    val df = spark.read
-      .format("org.apache.spark.sql.streaming.test")
-      .stream("/test")
-
-    var q = df.write
-      .format("org.apache.spark.sql.streaming.test")
-      .option("checkpointLocation", newMetadataDir)
-      .trigger(ProcessingTime(10.seconds))
-      .startStream()
-    q.stop()
-
-    assert(q.asInstanceOf[StreamExecution].trigger == ProcessingTime(10000))
-
-    q = df.write
-      .format("org.apache.spark.sql.streaming.test")
-      .option("checkpointLocation", newMetadataDir)
-      .trigger(ProcessingTime.create(100, TimeUnit.SECONDS))
-      .startStream()
-    q.stop()
-
-    assert(q.asInstanceOf[StreamExecution].trigger == ProcessingTime(100000))
-  }
-
-  test("source metadataPath") {
-    LastOptions.clear()
-
-    val checkpointLocation = newMetadataDir
-
-    val df1 = spark.read
-      .format("org.apache.spark.sql.streaming.test")
-      .stream()
-
-    val df2 = spark.read
-      .format("org.apache.spark.sql.streaming.test")
-      .stream()
-
-    val q = df1.union(df2).write
-      .format("org.apache.spark.sql.streaming.test")
-      .option("checkpointLocation", checkpointLocation)
-      .trigger(ProcessingTime(10.seconds))
-      .startStream()
-    q.stop()
-
-    verify(LastOptions.mockStreamSourceProvider).createSource(
-      spark.sqlContext,
-      checkpointLocation + "/sources/0",
-      None,
-      "org.apache.spark.sql.streaming.test",
-      Map.empty)
-
-    verify(LastOptions.mockStreamSourceProvider).createSource(
-      spark.sqlContext,
-      checkpointLocation + "/sources/1",
-      None,
-      "org.apache.spark.sql.streaming.test",
-      Map.empty)
-  }
-
-  private def newTextInput = Utils.createTempDir(namePrefix = "text").getCanonicalPath
-
-  test("check trigger() can only be called on continuous queries") {
-    val df = spark.read.text(newTextInput)
-    val w = df.write.option("checkpointLocation", newMetadataDir)
-    val e = intercept[AnalysisException](w.trigger(ProcessingTime("10 seconds")))
-    assert(e.getMessage == "trigger() can only be called on continuous queries;")
-  }
-
-  test("check queryName() can only be called on continuous queries") {
-    val df = spark.read.text(newTextInput)
-    val w = df.write.option("checkpointLocation", newMetadataDir)
-    val e = intercept[AnalysisException](w.queryName("queryName"))
-    assert(e.getMessage == "queryName() can only be called on continuous queries;")
-  }
-
-  test("check startStream() can only be called on continuous queries") {
-    val df = spark.read.text(newTextInput)
-    val w = df.write.option("checkpointLocation", newMetadataDir)
-    val e = intercept[AnalysisException](w.startStream())
-    assert(e.getMessage == "startStream() can only be called on continuous queries;")
-  }
-
-  test("check startStream(path) can only be called on continuous queries") {
-    val df = spark.read.text(newTextInput)
-    val w = df.write.option("checkpointLocation", newMetadataDir)
-    val e = intercept[AnalysisException](w.startStream("non_exist_path"))
-    assert(e.getMessage == "startStream() can only be called on continuous queries;")
-  }
-
-  test("check mode(SaveMode) can only be called on non-continuous queries") {
-    val df = spark.read
-      .format("org.apache.spark.sql.streaming.test")
-      .stream()
-    val w = df.write
-    val e = intercept[AnalysisException](w.mode(SaveMode.Append))
-    assert(e.getMessage == "mode() can only be called on non-continuous queries;")
-  }
-
-  test("check mode(string) can only be called on non-continuous queries") {
-    val df = spark.read
-      .format("org.apache.spark.sql.streaming.test")
-      .stream()
-    val w = df.write
-    val e = intercept[AnalysisException](w.mode("append"))
-    assert(e.getMessage == "mode() can only be called on non-continuous queries;")
-  }
-
-  test("check outputMode(OutputMode) can only be called on continuous queries") {
-    val df = spark.read.text(newTextInput)
-    val w = df.write.option("checkpointLocation", newMetadataDir)
-    val e = intercept[AnalysisException](w.outputMode(OutputMode.Append))
-    Seq("outputmode", "continuous queries").foreach { s =>
-      assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
-    }
-  }
-
-  test("check outputMode(string) can only be called on continuous queries") {
-    val df = spark.read.text(newTextInput)
-    val w = df.write.option("checkpointLocation", newMetadataDir)
-    val e = intercept[AnalysisException](w.outputMode("append"))
-    Seq("outputmode", "continuous queries").foreach { s =>
-      assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
-    }
-  }
-
-  test("check outputMode(string) throws exception on unsupported modes") {
-    def testError(outputMode: String): Unit = {
-      val df = spark.read
-        .format("org.apache.spark.sql.streaming.test")
-        .stream()
-      val w = df.write
-      val e = intercept[IllegalArgumentException](w.outputMode(outputMode))
-      Seq("output mode", "unknown", outputMode).foreach { s =>
-        assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
-      }
-    }
-    testError("Update")
-    testError("Xyz")
-  }
-
-  test("check bucketBy() can only be called on non-continuous queries") {
-    val df = spark.read
-      .format("org.apache.spark.sql.streaming.test")
-      .stream()
-    val w = df.write
-    val e = intercept[AnalysisException](w.bucketBy(1, "text").startStream())
-    assert(e.getMessage == "'startStream' does not support bucketing right now;")
-  }
-
-  test("check sortBy() can only be called on non-continuous queries;") {
-    val df = spark.read
-      .format("org.apache.spark.sql.streaming.test")
-      .stream()
-    val w = df.write
-    val e = intercept[AnalysisException](w.sortBy("text").startStream())
-    assert(e.getMessage == "'startStream' does not support bucketing right now;")
-  }
-
-  test("check save(path) can only be called on non-continuous queries") {
-    val df = spark.read
-      .format("org.apache.spark.sql.streaming.test")
-      .stream()
-    val w = df.write
-    val e = intercept[AnalysisException](w.save("non_exist_path"))
-    assert(e.getMessage == "save() can only be called on non-continuous queries;")
-  }
-
-  test("check save() can only be called on non-continuous queries") {
-    val df = spark.read
-      .format("org.apache.spark.sql.streaming.test")
-      .stream()
-    val w = df.write
-    val e = intercept[AnalysisException](w.save())
-    assert(e.getMessage == "save() can only be called on non-continuous queries;")
-  }
-
-  test("check insertInto() can only be called on non-continuous queries") {
-    val df = spark.read
-      .format("org.apache.spark.sql.streaming.test")
-      .stream()
-    val w = df.write
-    val e = intercept[AnalysisException](w.insertInto("non_exsit_table"))
-    assert(e.getMessage == "insertInto() can only be called on non-continuous queries;")
-  }
-
-  test("check saveAsTable() can only be called on non-continuous queries") {
-    val df = spark.read
-      .format("org.apache.spark.sql.streaming.test")
-      .stream()
-    val w = df.write
-    val e = intercept[AnalysisException](w.saveAsTable("non_exsit_table"))
-    assert(e.getMessage == "saveAsTable() can only be called on non-continuous queries;")
-  }
-
-  test("check jdbc() can only be called on non-continuous queries") {
-    val df = spark.read
-      .format("org.apache.spark.sql.streaming.test")
-      .stream()
-    val w = df.write
-    val e = intercept[AnalysisException](w.jdbc(null, null, null))
-    assert(e.getMessage == "jdbc() can only be called on non-continuous queries;")
-  }
-
-  test("check json() can only be called on non-continuous queries") {
-    val df = spark.read
-      .format("org.apache.spark.sql.streaming.test")
-      .stream()
-    val w = df.write
-    val e = intercept[AnalysisException](w.json("non_exist_path"))
-    assert(e.getMessage == "json() can only be called on non-continuous queries;")
-  }
-
-  test("check parquet() can only be called on non-continuous queries") {
-    val df = spark.read
-      .format("org.apache.spark.sql.streaming.test")
-      .stream()
-    val w = df.write
-    val e = intercept[AnalysisException](w.parquet("non_exist_path"))
-    assert(e.getMessage == "parquet() can only be called on non-continuous queries;")
-  }
-
-  test("check orc() can only be called on non-continuous queries") {
-    val df = spark.read
-      .format("org.apache.spark.sql.streaming.test")
-      .stream()
-    val w = df.write
-    val e = intercept[AnalysisException](w.orc("non_exist_path"))
-    assert(e.getMessage == "orc() can only be called on non-continuous queries;")
-  }
-
-  test("check text() can only be called on non-continuous queries") {
-    val df = spark.read
-      .format("org.apache.spark.sql.streaming.test")
-      .stream()
-    val w = df.write
-    val e = intercept[AnalysisException](w.text("non_exist_path"))
-    assert(e.getMessage == "text() can only be called on non-continuous queries;")
-  }
-
-  test("check csv() can only be called on non-continuous queries") {
-    val df = spark.read
-      .format("org.apache.spark.sql.streaming.test")
-      .stream()
-    val w = df.write
-    val e = intercept[AnalysisException](w.csv("non_exist_path"))
-    assert(e.getMessage == "csv() can only be called on non-continuous queries;")
-  }
-
-  test("check foreach() does not support partitioning or bucketing") {
-    val df = spark.read
-      .format("org.apache.spark.sql.streaming.test")
-      .stream()
-
-    var w = df.write.partitionBy("value")
-    var e = intercept[AnalysisException](w.foreach(null))
-    Seq("foreach", "partitioning").foreach { s =>
-      assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
-    }
-
-    w = df.write.bucketBy(2, "value")
-    e = intercept[AnalysisException](w.foreach(null))
-    Seq("foreach", "bucketing").foreach { s =>
-      assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
-    }
-  }
-
-  test("check jdbc() does not support partitioning or bucketing") {
-    val df = spark.read.text(newTextInput)
-
-    var w = df.write.partitionBy("value")
-    var e = intercept[AnalysisException](w.jdbc(null, null, null))
-    Seq("jdbc", "partitioning").foreach { s =>
-      assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
-    }
-
-    w = df.write.bucketBy(2, "value")
-    e = intercept[AnalysisException](w.jdbc(null, null, null))
-    Seq("jdbc", "bucketing").foreach { s =>
-      assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
-    }
-  }
-
-  test("ConsoleSink can be correctly loaded") {
-    LastOptions.clear()
-    val df = spark.read
-      .format("org.apache.spark.sql.streaming.test")
-      .stream()
-
-    val cq = df.write
-      .format("console")
-      .option("checkpointLocation", newMetadataDir)
-      .trigger(ProcessingTime(2.seconds))
-      .startStream()
-
-    cq.awaitTermination(2000L)
-  }
-
-  test("prevent all column partitioning") {
-    withTempDir { dir =>
-      val path = dir.getCanonicalPath
-      intercept[AnalysisException] {
-        spark.range(10).write.format("parquet").mode("overwrite").partitionBy("id").save(path)
-      }
-      intercept[AnalysisException] {
-        spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save(path)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/214adb14/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
new file mode 100644
index 0000000..c6d374f
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.test
+
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.duration._
+
+import org.mockito.Mockito._
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
+import org.apache.spark.sql.streaming.{ContinuousQuery, OutputMode, ProcessingTime, StreamTest}
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+import org.apache.spark.util.Utils
+
+object LastOptions {
+
+  var mockStreamSourceProvider = mock(classOf[StreamSourceProvider])
+  var mockStreamSinkProvider = mock(classOf[StreamSinkProvider])
+  var parameters: Map[String, String] = null
+  var schema: Option[StructType] = null
+  var partitionColumns: Seq[String] = Nil
+
+  def clear(): Unit = {
+    parameters = null
+    schema = null
+    partitionColumns = null
+    reset(mockStreamSourceProvider)
+    reset(mockStreamSinkProvider)
+  }
+}
+
+/** Dummy provider: returns no-op source/sink and records options in [[LastOptions]]. */
+class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
+
+  private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil)
+
+  override def sourceSchema(
+      spark: SQLContext,
+      schema: Option[StructType],
+      providerName: String,
+      parameters: Map[String, String]): (String, StructType) = {
+    LastOptions.parameters = parameters
+    LastOptions.schema = schema
+    LastOptions.mockStreamSourceProvider.sourceSchema(spark, schema, providerName, parameters)
+    ("dummySource", fakeSchema)
+  }
+
+  override def createSource(
+      spark: SQLContext,
+      metadataPath: String,
+      schema: Option[StructType],
+      providerName: String,
+      parameters: Map[String, String]): Source = {
+    LastOptions.parameters = parameters
+    LastOptions.schema = schema
+    LastOptions.mockStreamSourceProvider.createSource(
+      spark, metadataPath, schema, providerName, parameters)
+    new Source {
+      override def schema: StructType = fakeSchema
+
+      override def getOffset: Option[Offset] = Some(new LongOffset(0))
+
+      override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+        import spark.implicits._
+
+        Seq[Int]().toDS().toDF()
+      }
+    }
+  }
+
+  override def createSink(
+      spark: SQLContext,
+      parameters: Map[String, String],
+      partitionColumns: Seq[String],
+      outputMode: OutputMode): Sink = {
+    LastOptions.parameters = parameters
+    LastOptions.partitionColumns = partitionColumns
+    LastOptions.mockStreamSinkProvider.createSink(spark, parameters, partitionColumns, outputMode)
+    new Sink {
+      override def addBatch(batchId: Long, data: DataFrame): Unit = {}
+    }
+  }
+}
+
+class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
+
+  private def newMetadataDir =
+    Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
+
+  after {
+    spark.streams.active.foreach(_.stop())
+  }
+
+  test("write cannot be called on streaming datasets") {
+    val e = intercept[AnalysisException] {
+      spark.readStream
+        .format("org.apache.spark.sql.streaming.test")
+        .load()
+        .write
+        .save()
+    }
+    Seq("'write'", "not", "streaming Dataset/DataFrame").foreach { s =>
+      assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
+    }
+  }
+
+  test("resolve default source") {
+    spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .load()
+      .writeStream
+      .format("org.apache.spark.sql.streaming.test")
+      .option("checkpointLocation", newMetadataDir)
+      .start()
+      .stop()
+  }
+
+  test("resolve full class") {
+    spark.readStream
+      .format("org.apache.spark.sql.streaming.test.DefaultSource")
+      .load()
+      .writeStream
+      .format("org.apache.spark.sql.streaming.test")
+      .option("checkpointLocation", newMetadataDir)
+      .start()
+      .stop()
+  }
+
+  test("options") {
+    val map = new java.util.HashMap[String, String]
+    map.put("opt3", "3")
+
+    val df = spark.readStream
+        .format("org.apache.spark.sql.streaming.test")
+        .option("opt1", "1")
+        .options(Map("opt2" -> "2"))
+        .options(map)
+        .load()
+
+    assert(LastOptions.parameters("opt1") == "1")
+    assert(LastOptions.parameters("opt2") == "2")
+    assert(LastOptions.parameters("opt3") == "3")
+
+    LastOptions.clear()
+
+    df.writeStream
+      .format("org.apache.spark.sql.streaming.test")
+      .option("opt1", "1")
+      .options(Map("opt2" -> "2"))
+      .options(map)
+      .option("checkpointLocation", newMetadataDir)
+      .start()
+      .stop()
+
+    assert(LastOptions.parameters("opt1") == "1")
+    assert(LastOptions.parameters("opt2") == "2")
+    assert(LastOptions.parameters("opt3") == "3")
+  }
+
+  test("partitioning") {
+    val df = spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .load()
+
+    df.writeStream
+      .format("org.apache.spark.sql.streaming.test")
+      .option("checkpointLocation", newMetadataDir)
+      .start()
+      .stop()
+    assert(LastOptions.partitionColumns == Nil)
+
+    df.writeStream
+      .format("org.apache.spark.sql.streaming.test")
+      .option("checkpointLocation", newMetadataDir)
+      .partitionBy("a")
+      .start()
+      .stop()
+    assert(LastOptions.partitionColumns == Seq("a"))
+
+    withSQLConf("spark.sql.caseSensitive" -> "false") {
+      df.writeStream
+        .format("org.apache.spark.sql.streaming.test")
+        .option("checkpointLocation", newMetadataDir)
+        .partitionBy("A")
+        .start()
+        .stop()
+      assert(LastOptions.partitionColumns == Seq("a"))
+    }
+
+    intercept[AnalysisException] {
+      df.writeStream
+        .format("org.apache.spark.sql.streaming.test")
+        .option("checkpointLocation", newMetadataDir)
+        .partitionBy("b")
+        .start()
+        .stop()
+    }
+  }
+
+  test("stream paths") {
+    val df = spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .option("checkpointLocation", newMetadataDir)
+      .load("/test")
+
+    assert(LastOptions.parameters("path") == "/test")
+
+    LastOptions.clear()
+
+    df.writeStream
+      .format("org.apache.spark.sql.streaming.test")
+      .option("checkpointLocation", newMetadataDir)
+      .start("/test")
+      .stop()
+
+    assert(LastOptions.parameters("path") == "/test")
+  }
+
+  test("test different data types for options") {
+    val df = spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .option("intOpt", 56)
+      .option("boolOpt", false)
+      .option("doubleOpt", 6.7)
+      .load("/test")
+
+    assert(LastOptions.parameters("intOpt") == "56")
+    assert(LastOptions.parameters("boolOpt") == "false")
+    assert(LastOptions.parameters("doubleOpt") == "6.7")
+
+    LastOptions.clear()
+    df.writeStream
+      .format("org.apache.spark.sql.streaming.test")
+      .option("intOpt", 56)
+      .option("boolOpt", false)
+      .option("doubleOpt", 6.7)
+      .option("checkpointLocation", newMetadataDir)
+      .start("/test")
+      .stop()
+
+    assert(LastOptions.parameters("intOpt") == "56")
+    assert(LastOptions.parameters("boolOpt") == "false")
+    assert(LastOptions.parameters("doubleOpt") == "6.7")
+  }
+
+  test("unique query names") {
+
+    /** Start a query with a specific name */
+    def startQueryWithName(name: String = ""): ContinuousQuery = {
+      spark.readStream
+        .format("org.apache.spark.sql.streaming.test")
+        .load("/test")
+        .writeStream
+        .format("org.apache.spark.sql.streaming.test")
+        .option("checkpointLocation", newMetadataDir)
+        .queryName(name)
+        .start()
+    }
+
+    /** Start a query without specifying a name */
+    def startQueryWithoutName(): ContinuousQuery = {
+      spark.readStream
+        .format("org.apache.spark.sql.streaming.test")
+        .load("/test")
+        .writeStream
+        .format("org.apache.spark.sql.streaming.test")
+        .option("checkpointLocation", newMetadataDir)
+        .start()
+    }
+
+    /** Get the names of active streams */
+    def activeStreamNames: Set[String] = {
+      val streams = spark.streams.active
+      val names = streams.map(_.name).toSet
+      assert(streams.length === names.size, s"names of active queries are not unique: $names")
+      names
+    }
+
+    val q1 = startQueryWithName("name")
+
+    // Should not be able to start another query with the same name
+    intercept[IllegalArgumentException] {
+      startQueryWithName("name")
+    }
+    assert(activeStreamNames === Set("name"))
+
+    // Should be able to start queries with other names
+    val q3 = startQueryWithName("another-name")
+    assert(activeStreamNames === Set("name", "another-name"))
+
+    // Should be able to start queries with auto-generated names
+    val q4 = startQueryWithoutName()
+    assert(activeStreamNames.contains(q4.name))
+
+    // Should not be able to start a query with same auto-generated name
+    intercept[IllegalArgumentException] {
+      startQueryWithName(q4.name)
+    }
+
+    // Should be able to start query with that name after stopping the previous query
+    q1.stop()
+    val q5 = startQueryWithName("name")
+    assert(activeStreamNames.contains("name"))
+    spark.streams.active.foreach(_.stop())
+  }
+
+  test("trigger") {
+    val df = spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .load("/test")
+
+    var q = df.writeStream
+      .format("org.apache.spark.sql.streaming.test")
+      .option("checkpointLocation", newMetadataDir)
+      .trigger(ProcessingTime(10.seconds))
+      .start()
+    q.stop()
+
+    assert(q.asInstanceOf[StreamExecution].trigger == ProcessingTime(10000))
+
+    q = df.writeStream
+      .format("org.apache.spark.sql.streaming.test")
+      .option("checkpointLocation", newMetadataDir)
+      .trigger(ProcessingTime.create(100, TimeUnit.SECONDS))
+      .start()
+    q.stop()
+
+    assert(q.asInstanceOf[StreamExecution].trigger == ProcessingTime(100000))
+  }
+
+  test("source metadataPath") {
+    LastOptions.clear()
+
+    val checkpointLocation = newMetadataDir
+
+    val df1 = spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .load()
+
+    val df2 = spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .load()
+
+    val q = df1.union(df2).writeStream
+      .format("org.apache.spark.sql.streaming.test")
+      .option("checkpointLocation", checkpointLocation)
+      .trigger(ProcessingTime(10.seconds))
+      .start()
+    q.stop()
+
+    verify(LastOptions.mockStreamSourceProvider).createSource(
+      spark.sqlContext,
+      checkpointLocation + "/sources/0",
+      None,
+      "org.apache.spark.sql.streaming.test",
+      Map.empty)
+
+    verify(LastOptions.mockStreamSourceProvider).createSource(
+      spark.sqlContext,
+      checkpointLocation + "/sources/1",
+      None,
+      "org.apache.spark.sql.streaming.test",
+      Map.empty)
+  }
+
+  private def newTextInput = Utils.createTempDir(namePrefix = "text").getCanonicalPath
+
+  test("check outputMode(string) throws exception on unsupported modes") {
+    def testError(outputMode: String): Unit = {
+      val df = spark.readStream
+        .format("org.apache.spark.sql.streaming.test")
+        .load()
+      val w = df.writeStream
+      val e = intercept[IllegalArgumentException](w.outputMode(outputMode))
+      Seq("output mode", "unknown", outputMode).foreach { s =>
+        assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
+      }
+    }
+    testError("Update")
+    testError("Xyz")
+  }
+
+  test("check foreach() catches null writers") {
+    val df = spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .load()
+
+    var w = df.writeStream
+    var e = intercept[IllegalArgumentException](w.foreach(null))
+    Seq("foreach", "null").foreach { s =>
+      assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
+    }
+  }
+
+
+  test("check foreach() does not support partitioning") {
+    val df = spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .load()
+    val foreachWriter = new ForeachWriter[Row] {
+      override def open(partitionId: Long, version: Long): Boolean = false
+      override def process(value: Row): Unit = {}
+      override def close(errorOrNull: Throwable): Unit = {}
+    }
+    var w = df.writeStream.partitionBy("value")
+    var e = intercept[AnalysisException](w.foreach(foreachWriter).start())
+    Seq("foreach", "partitioning").foreach { s =>
+      assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
+    }
+  }
+
+  test("ConsoleSink can be correctly loaded") {
+    LastOptions.clear()
+    val df = spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .load()
+
+    val cq = df.writeStream
+      .format("console")
+      .option("checkpointLocation", newMetadataDir)
+      .trigger(ProcessingTime(2.seconds))
+      .start()
+
+    cq.awaitTermination(2000L)
+  }
+
+  test("prevent all column partitioning") {
+    withTempDir { dir =>
+      val path = dir.getCanonicalPath
+      intercept[AnalysisException] {
+        spark.range(10).writeStream
+          .outputMode("append")
+          .partitionBy("id")
+          .format("parquet")
+          .start(path)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/214adb14/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
new file mode 100644
index 0000000..98e57b3
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.test
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.util.Utils
+
+
+object LastOptions {
+
+  var parameters: Map[String, String] = null
+  var schema: Option[StructType] = null
+  var saveMode: SaveMode = null
+
+  def clear(): Unit = {
+    parameters = null
+    schema = null
+    saveMode = null
+  }
+}
+
+
+/** Dummy provider. */
+class DefaultSource
+  extends RelationProvider
+  with SchemaRelationProvider
+  with CreatableRelationProvider {
+
+  case class FakeRelation(sqlContext: SQLContext) extends BaseRelation {
+    override def schema: StructType = StructType(Seq(StructField("a", StringType)))
+  }
+
+  override def createRelation(
+      sqlContext: SQLContext,
+      parameters: Map[String, String],
+      schema: StructType
+    ): BaseRelation = {
+    LastOptions.parameters = parameters
+    LastOptions.schema = Some(schema)
+    FakeRelation(sqlContext)
+  }
+
+  override def createRelation(
+      sqlContext: SQLContext,
+      parameters: Map[String, String]
+    ): BaseRelation = {
+    LastOptions.parameters = parameters
+    LastOptions.schema = None
+    FakeRelation(sqlContext)
+  }
+
+  override def createRelation(
+      sqlContext: SQLContext,
+      mode: SaveMode,
+      parameters: Map[String, String],
+      data: DataFrame): BaseRelation = {
+    LastOptions.parameters = parameters
+    LastOptions.schema = None
+    LastOptions.saveMode = mode
+    FakeRelation(sqlContext)
+  }
+}
+
+
+class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext {
+
+  private def newMetadataDir =
+    Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
+
+  test("writeStream cannot be called on non-streaming datasets") {
+    val e = intercept[AnalysisException] {
+      spark.read
+        .format("org.apache.spark.sql.test")
+        .load()
+        .writeStream
+        .start()
+    }
+    Seq("'writeStream'", "only", "streaming Dataset/DataFrame").foreach { s =>
+      assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
+    }
+  }
+
+
+  test("resolve default source") {
+    spark.read
+      .format("org.apache.spark.sql.test")
+      .load()
+      .write
+      .format("org.apache.spark.sql.test")
+      .save()
+  }
+
+  test("resolve full class") {
+    spark.read
+      .format("org.apache.spark.sql.test.DefaultSource")
+      .load()
+      .write
+      .format("org.apache.spark.sql.test")
+      .save()
+  }
+
+  test("options") {
+    val map = new java.util.HashMap[String, String]
+    map.put("opt3", "3")
+
+    val df = spark.read
+        .format("org.apache.spark.sql.test")
+        .option("opt1", "1")
+        .options(Map("opt2" -> "2"))
+        .options(map)
+        .load()
+
+    assert(LastOptions.parameters("opt1") == "1")
+    assert(LastOptions.parameters("opt2") == "2")
+    assert(LastOptions.parameters("opt3") == "3")
+
+    LastOptions.clear()
+
+    df.write
+      .format("org.apache.spark.sql.test")
+      .option("opt1", "1")
+      .options(Map("opt2" -> "2"))
+      .options(map)
+      .save()
+
+    assert(LastOptions.parameters("opt1") == "1")
+    assert(LastOptions.parameters("opt2") == "2")
+    assert(LastOptions.parameters("opt3") == "3")
+  }
+
+  test("save mode") {
+    val df = spark.read
+      .format("org.apache.spark.sql.test")
+      .load()
+
+    df.write
+      .format("org.apache.spark.sql.test")
+      .mode(SaveMode.ErrorIfExists)
+      .save()
+    assert(LastOptions.saveMode === SaveMode.ErrorIfExists)
+  }
+
+  test("paths") {
+    val df = spark.read
+      .format("org.apache.spark.sql.test")
+      .option("checkpointLocation", newMetadataDir)
+      .load("/test")
+
+    assert(LastOptions.parameters("path") == "/test")
+
+    LastOptions.clear()
+
+    df.write
+      .format("org.apache.spark.sql.test")
+      .option("checkpointLocation", newMetadataDir)
+      .save("/test")
+
+    assert(LastOptions.parameters("path") == "/test")
+  }
+
+  test("test different data types for options") {
+    val df = spark.read
+      .format("org.apache.spark.sql.test")
+      .option("intOpt", 56)
+      .option("boolOpt", false)
+      .option("doubleOpt", 6.7)
+      .load("/test")
+
+    assert(LastOptions.parameters("intOpt") == "56")
+    assert(LastOptions.parameters("boolOpt") == "false")
+    assert(LastOptions.parameters("doubleOpt") == "6.7")
+
+    LastOptions.clear()
+    df.write
+      .format("org.apache.spark.sql.test")
+      .option("intOpt", 56)
+      .option("boolOpt", false)
+      .option("doubleOpt", 6.7)
+      .option("checkpointLocation", newMetadataDir)
+      .save("/test")
+
+    assert(LastOptions.parameters("intOpt") == "56")
+    assert(LastOptions.parameters("boolOpt") == "false")
+    assert(LastOptions.parameters("doubleOpt") == "6.7")
+  }
+
+  test("check jdbc() does not support partitioning or bucketing") {
+    val df = spark.read.text(Utils.createTempDir(namePrefix = "text").getCanonicalPath)
+
+    var w = df.write.partitionBy("value")
+    var e = intercept[AnalysisException](w.jdbc(null, null, null))
+    Seq("jdbc", "partitioning").foreach { s =>
+      assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
+    }
+
+    w = df.write.bucketBy(2, "value")
+    e = intercept[AnalysisException](w.jdbc(null, null, null))
+    Seq("jdbc", "bucketing").foreach { s =>
+      assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
+    }
+  }
+
+  test("prevent all column partitioning") {
+    withTempDir { dir =>
+      val path = dir.getCanonicalPath
+      intercept[AnalysisException] {
+        spark.range(10).write.format("parquet").mode("overwrite").partitionBy("id").save(path)
+      }
+      intercept[AnalysisException] {
+        spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save(path)
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message