spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tdas <...@git.apache.org>
Subject [GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...
Date Fri, 09 Mar 2018 02:30:10 GMT
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20767#discussion_r173351789
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
---
    @@ -342,80 +401,65 @@ private[kafka010] object CachedKafkaConsumer extends Logging {
         }
       }
     
    -  def releaseKafkaConsumer(
    -      topic: String,
    -      partition: Int,
    -      kafkaParams: ju.Map[String, Object]): Unit = {
    -    val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
    -    val topicPartition = new TopicPartition(topic, partition)
    -    val key = CacheKey(groupId, topicPartition)
    +  private def releaseKafkaConsumer(
    +    topicPartition: TopicPartition,
    +    kafkaParams: ju.Map[String, Object]): Unit = {
    +    val key = new CacheKey(topicPartition, kafkaParams)
     
         synchronized {
           val consumer = cache.get(key)
           if (consumer != null) {
    -        consumer.inuse = false
    +        if (consumer.markedForClose) {
    +          consumer.close()
    +          cache.remove(key)
    +        } else {
    +          consumer.inuse = false
    +        }
           } else {
             logWarning(s"Attempting to release consumer that does not exist")
    --- End diff --
    
    This should not be the case. We do not remove any consumer from the cache while it is
being used. So the scenario that you mentioned should not happen.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message