spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zsxwing <...@git.apache.org>
Subject [GitHub] spark pull request #20745: [SPARK-23288][SS] Fix output metrics with parquet...
Date Fri, 16 Mar 2018 18:29:13 GMT
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20745#discussion_r175178609
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
---
    @@ -405,4 +406,52 @@ class FileStreamSinkSuite extends StreamTest {
           }
         }
       }
    +
    +  test("SPARK-23288 writing and checking output metrics") {
    +    Seq("parquet", "orc", "text", "json").foreach { format =>
    +      val inputData = MemoryStream[String]
    +      val df = inputData.toDF()
    +
    +      val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
    +      val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
    +
    +      var query: StreamingQuery = null
    +
    +      var numTasks = 0
    +      var recordsWritten: Long = 0L
    +      var bytesWritten: Long = 0L
    +      try {
    +        spark.sparkContext.addSparkListener(new SparkListener() {
    +          override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    +            val outputMetrics = taskEnd.taskMetrics.outputMetrics
    +            recordsWritten += outputMetrics.recordsWritten
    +            bytesWritten += outputMetrics.bytesWritten
    +            numTasks += 1
    +          }
    +        })
    +
    +        query =
    +          df.writeStream
    +            .option("checkpointLocation", checkpointDir)
    +            .format(format)
    +            .start(outputDir)
    +
    +        inputData.addData("1", "2", "3")
    +        inputData.addData("4", "5")
    +
    +        failAfter(streamingTimeout) {
    +          query.processAllAvailable()
    +        }
    +
    +        assert(numTasks === 2)
    --- End diff --
    
    I would just check `numTasks > 0` since it depends on the configurations and the number
of CPU codes.  


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message