spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zsxw...@apache.org
Subject spark git commit: [SPARK-18671][SS][TEST] Added tests to ensure stability of that all Structured Streaming log formats
Date Tue, 06 Dec 2016 21:05:35 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 ace4079c5 -> d20e0d6b8


[SPARK-18671][SS][TEST] Added tests to ensure stability of that all Structured Streaming log
formats

## What changes were proposed in this pull request?

To be able to restart StreamingQueries across Spark version, we have already made the logs
(offset log, file source log, file sink log) use json. We should added tests with actual json
files in the Spark such that any incompatible changes in reading the logs is immediately caught.
This PR add tests for FileStreamSourceLog, FileStreamSinkLog, and OffsetSeqLog.

## How was this patch tested?
new unit tests

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #16128 from tdas/SPARK-18671.

(cherry picked from commit 1ef6b296d7cd2d93cdfd5f54940842d6bb915ce0)
Signed-off-by: Shixiong Zhu <shixiong@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d20e0d6b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d20e0d6b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d20e0d6b

Branch: refs/heads/branch-2.1
Commit: d20e0d6b8919eccaab9ae7db94ba80fdfac03c9d
Parents: ace4079
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Tue Dec 6 13:05:22 2016 -0800
Committer: Shixiong Zhu <shixiong@databricks.com>
Committed: Tue Dec 6 13:05:31 2016 -0800

----------------------------------------------------------------------
 dev/.rat-excludes                               |  1 +
 .../apache/spark/sql/kafka010/JsonUtils.scala   |  9 +++++-
 .../sql/kafka010/KafkaSourceOffsetSuite.scala   | 12 ++++++++
 .../file-sink-log-version-2.1.0/7.compact       |  9 ++++++
 .../file-sink-log-version-2.1.0/8               |  3 ++
 .../file-sink-log-version-2.1.0/9               |  2 ++
 .../file-source-log-version-2.1.0/2.compact     |  4 +++
 .../file-source-log-version-2.1.0/3             |  2 ++
 .../file-source-log-version-2.1.0/4             |  2 ++
 .../file-source-offset-version-2.1.0.txt        |  1 +
 .../kafka-source-offset-version-2.1.0.txt       |  1 +
 .../offset-log-version-2.1.0/0                  |  4 +++
 .../streaming/FileStreamSinkLogSuite.scala      | 21 ++++++++++++++
 .../execution/streaming/OffsetSeqLogSuite.scala | 16 +++++++++++
 .../sql/streaming/FileStreamSourceSuite.scala   | 30 ++++++++++++++++++--
 15 files changed, 114 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d20e0d6b/dev/.rat-excludes
----------------------------------------------------------------------
diff --git a/dev/.rat-excludes b/dev/.rat-excludes
index a3efdde..6be1c72 100644
--- a/dev/.rat-excludes
+++ b/dev/.rat-excludes
@@ -102,3 +102,4 @@ org.apache.spark.scheduler.ExternalClusterManager
 .Rbuildignore
 org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
 spark-warehouse
+structured-streaming/*

http://git-wip-us.apache.org/repos/asf/spark/blob/d20e0d6b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
index 13d7170..868edb5 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
@@ -81,7 +81,14 @@ private object JsonUtils {
    */
   def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = {
     val result = new HashMap[String, HashMap[Int, Long]]()
-    partitionOffsets.foreach { case (tp, off) =>
+    implicit val ordering = new Ordering[TopicPartition] {
+      override def compare(x: TopicPartition, y: TopicPartition): Int = {
+        Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
+      }
+    }
+    val partitions = partitionOffsets.keySet.toSeq.sorted  // sort for more determinism
+    partitions.foreach { tp =>
+        val off = partitionOffsets(tp)
         val parts = result.getOrElse(tp.topic, new HashMap[Int, Long])
         parts += tp.partition -> off
         result += tp.topic -> parts

http://git-wip-us.apache.org/repos/asf/spark/blob/d20e0d6b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
index 881018f..c8326ff 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
@@ -89,4 +89,16 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext
{
         Array(0 -> batch0Serialized, 1 -> batch1Serialized))
     }
   }
+
+  test("read Spark 2.1.0 log format") {
+    val offset = readFromResource("kafka-source-offset-version-2.1.0.txt")
+    assert(KafkaSourceOffset(offset) ===
+      KafkaSourceOffset(("topic1", 0, 456L), ("topic1", 1, 789L), ("topic2", 0, 0L)))
+  }
+
+  private def readFromResource(file: String): SerializedOffset = {
+    import scala.io.Source
+    val str = Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString
+    SerializedOffset(str)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d20e0d6b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/7.compact
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/7.compact
b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/7.compact
new file mode 100644
index 0000000..e1ec8a7
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/7.compact
@@ -0,0 +1,9 @@
+v1
+{"path":"/a/b/0","size":1,"isDir":false,"modificationTime":1,"blockReplication":1,"blockSize":100,"action":"add"}
+{"path":"/a/b/1","size":100,"isDir":false,"modificationTime":100,"blockReplication":1,"blockSize":100,"action":"add"}
+{"path":"/a/b/2","size":200,"isDir":false,"modificationTime":200,"blockReplication":1,"blockSize":100,"action":"add"}
+{"path":"/a/b/3","size":300,"isDir":false,"modificationTime":300,"blockReplication":1,"blockSize":100,"action":"add"}
+{"path":"/a/b/4","size":400,"isDir":false,"modificationTime":400,"blockReplication":1,"blockSize":100,"action":"add"}
+{"path":"/a/b/5","size":500,"isDir":false,"modificationTime":500,"blockReplication":1,"blockSize":100,"action":"add"}
+{"path":"/a/b/6","size":600,"isDir":false,"modificationTime":600,"blockReplication":1,"blockSize":100,"action":"add"}
+{"path":"/a/b/7","size":700,"isDir":false,"modificationTime":700,"blockReplication":1,"blockSize":100,"action":"add"}

http://git-wip-us.apache.org/repos/asf/spark/blob/d20e0d6b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/8
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/8
b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/8
new file mode 100644
index 0000000..e798980
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/8
@@ -0,0 +1,3 @@
+v1
+{"path":"/a/b/8","size":800,"isDir":false,"modificationTime":800,"blockReplication":1,"blockSize":100,"action":"add"}
+{"path":"/a/b/0","size":100,"isDir":false,"modificationTime":100,"blockReplication":1,"blockSize":100,"action":"delete"}

http://git-wip-us.apache.org/repos/asf/spark/blob/d20e0d6b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/9
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/9
b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/9
new file mode 100644
index 0000000..42fb0ee
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/file-sink-log-version-2.1.0/9
@@ -0,0 +1,2 @@
+v1
+{"path":"/a/b/9","size":900,"isDir":false,"modificationTime":900,"blockReplication":3,"blockSize":200,"action":"add"}

http://git-wip-us.apache.org/repos/asf/spark/blob/d20e0d6b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/2.compact
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/2.compact
b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/2.compact
new file mode 100644
index 0000000..95f78bb
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/2.compact
@@ -0,0 +1,4 @@
+v1
+{"path":"/a/b/0","timestamp":1480730949000,"batchId":0}
+{"path":"/a/b/1","timestamp":1480730950000,"batchId":1}
+{"path":"/a/b/2","timestamp":1480730950000,"batchId":2}

http://git-wip-us.apache.org/repos/asf/spark/blob/d20e0d6b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/3
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/3
b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/3
new file mode 100644
index 0000000..2caa597
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/3
@@ -0,0 +1,2 @@
+v1
+{"path":"/a/b/3","timestamp":1480730950000,"batchId":3}

http://git-wip-us.apache.org/repos/asf/spark/blob/d20e0d6b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/4
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/4
b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/4
new file mode 100644
index 0000000..e54b943
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/file-source-log-version-2.1.0/4
@@ -0,0 +1,2 @@
+v1
+{"path":"/a/b/4","timestamp":1480730951000,"batchId":4}

http://git-wip-us.apache.org/repos/asf/spark/blob/d20e0d6b/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt
b/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt
new file mode 100644
index 0000000..51b4008
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/file-source-offset-version-2.1.0.txt
@@ -0,0 +1 @@
+345

http://git-wip-us.apache.org/repos/asf/spark/blob/d20e0d6b/sql/core/src/test/resources/structured-streaming/kafka-source-offset-version-2.1.0.txt
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/kafka-source-offset-version-2.1.0.txt
b/sql/core/src/test/resources/structured-streaming/kafka-source-offset-version-2.1.0.txt
new file mode 100644
index 0000000..6410031
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/kafka-source-offset-version-2.1.0.txt
@@ -0,0 +1 @@
+{"topic1":{"0":456,"1":789},"topic2":{"0":0}}

http://git-wip-us.apache.org/repos/asf/spark/blob/d20e0d6b/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/0
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/0 b/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/0
new file mode 100644
index 0000000..fe5c1d4
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/offset-log-version-2.1.0/0
@@ -0,0 +1,4 @@
+v1
+{"batchWatermarkMs":0,"batchTimestampMs":1480981499528}
+0
+{"topic-0":{"0":1}}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/d20e0d6b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
index e046fee..8a21b76 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
@@ -185,6 +185,21 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext
{
     }
   }
 
+  test("read Spark 2.1.0 log format") {
+    assert(readFromResource("file-sink-log-version-2.1.0") === Seq(
+      // SinkFileStatus("/a/b/0", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION),
-> deleted
+      SinkFileStatus("/a/b/1", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION),
+      SinkFileStatus("/a/b/2", 200, false, 200, 1, 100, FileStreamSinkLog.ADD_ACTION),
+      SinkFileStatus("/a/b/3", 300, false, 300, 1, 100, FileStreamSinkLog.ADD_ACTION),
+      SinkFileStatus("/a/b/4", 400, false, 400, 1, 100, FileStreamSinkLog.ADD_ACTION),
+      SinkFileStatus("/a/b/5", 500, false, 500, 1, 100, FileStreamSinkLog.ADD_ACTION),
+      SinkFileStatus("/a/b/6", 600, false, 600, 1, 100, FileStreamSinkLog.ADD_ACTION),
+      SinkFileStatus("/a/b/7", 700, false, 700, 1, 100, FileStreamSinkLog.ADD_ACTION),
+      SinkFileStatus("/a/b/8", 800, false, 800, 1, 100, FileStreamSinkLog.ADD_ACTION),
+      SinkFileStatus("/a/b/9", 900, false, 900, 3, 200, FileStreamSinkLog.ADD_ACTION)
+    ))
+  }
+
   /**
    * Create a fake SinkFileStatus using path and action. Most of tests don't care about other
fields
    * in SinkFileStatus.
@@ -206,4 +221,10 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext
{
       f(sinkLog)
     }
   }
+
+  private def readFromResource(dir: String): Seq[SinkFileStatus] = {
+    val input = getClass.getResource(s"/structured-streaming/$dir")
+    val log = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, input.toString)
+    log.allFiles()
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d20e0d6b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
index d3a83ea..d139efa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
@@ -69,4 +69,20 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
         Array(0 -> batch0Serialized, 1 -> batch1Serialized))
     }
   }
+
+  test("read Spark 2.1.0 log format") {
+    val (batchId, offsetSeq) = readFromResource("offset-log-version-2.1.0")
+    assert(batchId === 0)
+    assert(offsetSeq.offsets === Seq(
+      Some(SerializedOffset("0")),
+      Some(SerializedOffset("""{"topic-0":{"0":1}}"""))
+    ))
+    assert(offsetSeq.metadata === Some(OffsetSeqMetadata(0L, 1480981499528L)))
+  }
+
+  private def readFromResource(dir: String): (Long, OffsetSeq) = {
+    val input = getClass.getResource(s"/structured-streaming/$dir")
+    val log = new OffsetSeqLog(spark, input.toString)
+    log.getLatest().get
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d20e0d6b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 8256c63..ff1f3e2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -19,14 +19,13 @@ package org.apache.spark.sql.streaming
 
 import java.io.File
 
-import scala.collection.mutable
-
 import org.scalatest.PrivateMethodTester
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
@@ -1022,6 +1021,33 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     val options = new FileStreamOptions(Map("maxfilespertrigger" -> "1"))
     assert(options.maxFilesPerTrigger == Some(1))
   }
+
+  test("FileStreamSource offset - read Spark 2.1.0 log format") {
+    val offset = readOffsetFromResource("file-source-offset-version-2.1.0.txt")
+    assert(LongOffset.convert(offset) === Some(LongOffset(345)))
+  }
+
+  test("FileStreamSourceLog - read Spark 2.1.0 log format") {
+    assert(readLogFromResource("file-source-log-version-2.1.0") === Seq(
+      FileEntry("/a/b/0", 1480730949000L, 0L),
+      FileEntry("/a/b/1", 1480730950000L, 1L),
+      FileEntry("/a/b/2", 1480730950000L, 2L),
+      FileEntry("/a/b/3", 1480730950000L, 3L),
+      FileEntry("/a/b/4", 1480730951000L, 4L)
+    ))
+  }
+
+  private def readLogFromResource(dir: String): Seq[FileEntry] = {
+    val input = getClass.getResource(s"/structured-streaming/$dir")
+    val log = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, input.toString)
+    log.allFiles()
+  }
+
+  private def readOffsetFromResource(file: String): SerializedOffset = {
+    import scala.io.Source
+    val str = Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString
+    SerializedOffset(str.trim)
+  }
 }
 
 class FileStreamSourceStressTestSuite extends FileStreamSourceTest {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message