spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From koeninger <...@git.apache.org>
Subject [GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Date Wed, 21 Feb 2018 05:22:34 GMT
Github user koeninger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r169537541
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
---
    @@ -83,13 +81,50 @@ class CachedKafkaConsumer[K, V] private(
             s"Failed to get records for $groupId $topic $partition $offset after polling
for $timeout")
           record = buffer.next()
           assert(record.offset == offset,
    -        s"Got wrong record for $groupId $topic $partition even after seeking to offset
$offset")
    +        s"Got wrong record for $groupId $topic $partition even after seeking to offset
$offset " +
    +          s"got offset ${record.offset} instead. If this is a compacted topic, consider
enabling " +
    +          "spark.streaming.kafka.allowNonConsecutiveOffsets"
    +      )
         }
     
         nextOffset = offset + 1
         record
       }
     
    +  /**
    +   * Start a batch on a compacted topic
    +   */
    +  def compactedStart(offset: Long, timeout: Long): Unit = {
    +    logDebug(s"compacted start $groupId $topic $partition starting $offset")
    +    // This seek may not be necessary, but it's hard to tell due to gaps in compacted
topics
    +    if (offset != nextOffset) {
    +      logInfo(s"Initial fetch for compacted $groupId $topic $partition $offset")
    +      seek(offset)
    +      poll(timeout)
    +    }
    +  }
    +
    +  /**
    +   * Get the next record in the batch from a compacted topic.
    +   * Assumes compactedStart has been called first, and ignores gaps.
    +   */
    +  def compactedNext(timeout: Long): ConsumerRecord[K, V] = {
    +    if (!buffer.hasNext()) { poll(timeout) }
    +    assert(buffer.hasNext(),
    --- End diff --
    
    That's a "shouldn't happen unless the topicpartition or broker is gone" kind of thing.
 Semantically I could see that being more like require than assert, but don't have a strong
opinion.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message