spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Yuval Itzchakov (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-24987) Kafka Cached Consumer Leaking File Descriptors
Date Sun, 05 Aug 2018 10:52:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-24987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16569432#comment-16569432
] 

Yuval Itzchakov commented on SPARK-24987:
-----------------------------------------

Is there any chance this will make it in time for 2.3.2? This is a critical fix for us.

> Kafka Cached Consumer Leaking File Descriptors
> ----------------------------------------------
>
>                 Key: SPARK-24987
>                 URL: https://issues.apache.org/jira/browse/SPARK-24987
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.0, 2.3.1
>         Environment: Spark 2.3.1
> Java(TM) SE Runtime Environment (build 1.8.0_112-b15)
>  Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode)
>  
>            Reporter: Yuval Itzchakov
>            Assignee: Yuval Itzchakov
>            Priority: Critical
>             Fix For: 2.4.0
>
>
> Setup:
>  * Spark 2.3.1
>  * Java 1.8.0 (112)
>  * Standalone Cluster Manager
>  * 3 Nodes, 1 Executor per node.
> Spark 2.3.0 introduced a new mechanism for caching Kafka consumers (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel)
via KafkaDataConsumer.acquire.
> It seems that there are situations (I've been trying to debug it, haven't been able to
find the root cause as of yet) where cached consumers remain "in use" throughout the life
time of the task and are never released. This can be identified by the following line of
the stack trace:
> at org.apache.spark.sql.kafka010.KafkaDataConsumer$.acquire(KafkaDataConsumer.scala:460)
> Which points to:
> {code:java}
> } else if (existingInternalConsumer.inUse) {
>   // If consumer is already cached but is currently in use, then return a new consumer
>   NonCachedKafkaDataConsumer(newInternalConsumer)
> {code}
>  Meaning the existing consumer created for that `TopicPartition` is still in use for
some reason. The weird thing is that you can see this for very old tasks which have already
finished successfully.
> I've traced down this leak using file leak detector, attaching it to the running Executor
JVM process. I've emitted the list of open file descriptors which [you can find here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d],
and you can see that the majority of them are epoll FD used by Kafka Consumers, indicating
that they aren't closing.
>  Spark graph:
> {code:java}
> kafkaStream
>   .load()
>   .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>   .as[(String, String)]
>   .flatMap {...}
>   .groupByKey(...)
>   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...)
>   .foreach(...)
>   .outputMode(OutputMode.Update)
>   .option("checkpointLocation",
> sparkConfiguration.properties.checkpointDirectory)
>   .start()
>   .awaitTermination(){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message