kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Damian Guy (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6350) File descriptors leak with persistent KeyValueStore
Date Tue, 12 Dec 2017 12:49:06 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16287550#comment-16287550
] 

Damian Guy commented on KAFKA-6350:
-----------------------------------

Hi [~gheorghealingabriel] are you closing the iterators from the state store in your punctuate
method? Can you share some code?

> File descriptors leak with persistent KeyValueStore
> ---------------------------------------------------
>
>                 Key: KAFKA-6350
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6350
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.1, 1.0.0
>            Reporter: Alin Gheorghe
>
> When using the low level processor API with persistent KV stores we observed continuous
increase in the number of SSTs on disk. The file descriptors remain open until reaching the
configured OS limit (100k in our case), when Kafka Streams crashes with "Too many open files"
exception. In our case this happens regularly in about 17 hours of uptime. The commit interval
is set to 5 seconds and we never call it from our code.
> Our topology consists in 1 source topic, 7 processors, 2 KV stores and 2 sink topics.
Retention policy is set to 2 days and the topics have 25 partitions.
> Using the punctuation mechanism in Kafka Streams 1.0.0 we perform a cleanup every 30
seconds which checks for keys that have not been updated for at least 20 minutes. The KV stores
hold temporary user sessions which last for 5 minutes and have about 50 updates (user actions).
> 2017-12-11 10:57:03 
> {code:none}
> ~ # lsof 1 | grep rocksdb.*.sst | wc -l
> 54
> {code}
> 2017-12-11 11:45:31 
> {code:none}
> ~ # lsof 1 | grep rocksdb.*.sst | wc -l
> 6742
> {code}
> We use the following state store APIs: *all*, *get*, *delete*, *put*.
> When switching to in memory state stores this obviously doesn't happen.
> We have also tried to override the RocksDB parameter to *max_open_files* which defaults
to -1, but the configured values seems to be ignored and RocksDB surpasses that threshold.

> Sometimes the application crashes with different error which may or may not be related.
We will file a different Jira issue if it seems unrelated:
> {code:none}
> RocksDBExceptionJni::ThrowNew/StatusJni - Error: unexpected exception!
> 2017-12-12 11:37:25,758 [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444-StreamThread-1]
WARN  org.apache.kafka.streams.KafkaStreams - stream-client [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444]All
stream threads have died. The instance will be in error state and should be closed.
> 2017-12-12 11:37:25,758 [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444-StreamThread-1]
ERROR com.X.Y.Z.ApiStreaming$ - [ApiStreaming] Thread 12 died with exception task [0_257]
Failed to flush state store eventQueueStore. Shutting down the entire Kafka Streams process
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_257] Failed to flush
state store eventQueueStore
> 	at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248)
> 	at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196)
> 	at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:324)
> 	at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:304)
> 	at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> 	at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:299)
> 	at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:289)
> 	at org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:87)
> 	at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:451)
> 	at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:380)
> 	at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:309)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
> Caused by: java.lang.IllegalArgumentException: Illegal value provided for SubCode.
> 	at org.rocksdb.Status$SubCode.getSubCode(Status.java:109)
> 	at org.rocksdb.Status.<init>(Status.java:30)
> 	at org.rocksdb.RocksDB.flush(Native Method)
> 	at org.rocksdb.RocksDB.flush(RocksDB.java:1743)
> 	at org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:435)
> 	at org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:428)
> 	at org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.flush(WrappedStateStore.java:84)
> 	at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:268)
> 	at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:153)
> 	at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:245)
> 	... 14 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message