spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tdas <...@git.apache.org>
Subject [GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Date Wed, 10 Jan 2018 00:11:15 GMT
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20096#discussion_r160552160
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
---
    @@ -977,20 +971,8 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest
with Shared
         }
       }
     
    -  test("stress test for failOnDataLoss=false") {
    -    val reader = spark
    -      .readStream
    -      .format("kafka")
    -      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
    -      .option("kafka.metadata.max.age.ms", "1")
    -      .option("subscribePattern", "failOnDataLoss.*")
    -      .option("startingOffsets", "earliest")
    -      .option("failOnDataLoss", "false")
    -      .option("fetchOffset.retryIntervalMs", "3000")
    -    val kafka = reader.load()
    -      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    -      .as[(String, String)]
    -    val query = kafka.map(kv => kv._2.toInt).writeStream.foreach(new ForeachWriter[Int]
{
    +  protected def startStream(ds: Dataset[Int]) = {
    --- End diff --
    
    i think this factoring is not needed. `startStream()` is not used anywhere else other
than in this test. So i dont see a point of refactoring it to define it outside the test.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message