spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gaborgsomogyi <...@git.apache.org>
Subject [GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...
Date Wed, 05 Sep 2018 13:43:06 GMT
Github user gaborgsomogyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22138#discussion_r215274150
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
---
    @@ -414,17 +468,37 @@ private[kafka010] case class InternalKafkaConsumer(
         }
       }
     
    -  /** Create a new consumer and reset cached states */
    -  private def resetConsumer(): Unit = {
    -    consumer.close()
    -    consumer = createConsumer
    -    fetchedData.reset()
    +  /**
    +   * Poll messages from Kafka starting from `offset` and update `fetchedData`. `fetchedData`
may be
    +   * empty if the Kafka consumer fetches some messages but all of them are not visible
messages
    +   * (either transaction messages, or aborted messages when `isolation.level` is `read_committed`).
    +   *
    +   * @throws OffsetOutOfRangeException if `offset` is out of range.
    +   * @throws TimeoutException if the consumer position is not changed after polling.
It means the
    +   *                          consumer polls nothing before timeout.
    +   */
    +  private def fetchData(offset: Long, pollTimeoutMs: Long): Unit = {
    +    val (records, offsetAfterPoll) = consumer.fetch(offset, pollTimeoutMs)
    +    fetchedData.withNewPoll(records.listIterator, offsetAfterPoll)
    +  }
    +
    +  private def ensureConsumerAvailable(): Unit = {
    +    if (consumer == null) {
    --- End diff --
    
    Same here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message