kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Greg Fodor <gfo...@gmail.com>
Subject Re: RocksDB flushing issue on 0.10.2 streams
Date Tue, 04 Jul 2017 18:35:01 GMT
Also, I am on 0.10.2.1, so poll interval was already set to MAX_VALUE.

On Tue, Jul 4, 2017 at 11:28 AM, Greg Fodor <gfodor@gmail.com> wrote:

> I've nuked the nodes this happened on, but the job had been running for
> about 5-10 minutes across 5 nodes before this happened. Does the log show a
> rebalance was happening? It looks to me like the standby task was just
> committing as part of normal operations.
>
> On Tue, Jul 4, 2017 at 7:40 AM, Damian Guy <damian.guy@gmail.com> wrote:
>
>> Hi Greg,
>>
>> Obviously a bit difficult to read the RocksDBException, but my guess is it
>> is because the state directory gets deleted right before the flush
>> happens:
>> 2017-07-04 10:54:46,829 [myid:] - INFO  [StreamThread-21:StateDirector
>> y@213]
>> - Deleting obsolete state directory 0_10 for task 0_10
>>
>> Yes it looks like it is possibly the same bug as KAFKA-5070.
>>
>> It looks like your application is constantly rebalancing during store
>> intialization, which may be the reason this bug comes about (there is a
>> chance that the state dir lock is released so when the thread tries to
>> removes the stale state directory it is able to get the lock). You
>> probably
>> want to configure `max.poll.interval.ms` to be a reasonably large value
>> ( i
>> think we default to Integer.MAX_VALUE in 0.10.2.1). You can also try
>> setting `state.cleanup.delay.ms` to a higher value (default is 10
>> minutes),
>> to try and avoid it happening during a rebalance (I know this isn't a fix,
>> but will make it less likely to happen).
>>
>> Thanks,
>> Damian
>>
>> On Tue, 4 Jul 2017 at 12:43 Greg Fodor <gfodor@gmail.com> wrote:
>>
>> > Hi all, we are working on upgrading our jobs from 0.10.0 to use Kafka
>> > Streams 0.10.2.1 and are hitting a problem. We have an ETL job that has
>> 4
>> > state stores and runs across a few hundred partitions, and as part of
>> load
>> > testing the job we are trying to reload our data out of kafka into a
>> test
>> > db. The result is we are able to load about 4M tuples and then this
>> error
>> > pops up on all of the stream nodes simultaneously. There are 4 rocksdb
>> > stores in question and there are lots of these errors which takes it
>> down.
>> > This bug *does* not seem to occur on 0.10.1.
>> >
>> > A similar error was mentioned here:
>> > https://issues.apache.org/jira/browse/KAFKA-5070
>> >
>> > Full log attached.
>> >
>> > org.apache.kafka.streams.errors.ProcessorStateException: task [0_10]
>> > Failed to flush state store session-id-start-events
>> > at
>> > org.apache.kafka.streams.processor.internals.ProcessorStateM
>> anager.flush(ProcessorStateManager.java:337)
>> > at
>> > org.apache.kafka.streams.processor.internals.StandbyTask.
>> commit(StandbyTask.java:94)
>> > at
>> > org.apache.kafka.streams.processor.internals.StreamThread.
>> commitOne(StreamThread.java:807)
>> > at
>> > org.apache.kafka.streams.processor.internals.StreamThread.
>> commitAll(StreamThread.java:797)
>> > at
>> > org.apache.kafka.streams.processor.internals.StreamThread.
>> maybeCommit(StreamThread.java:769)
>> > at
>> > org.apache.kafka.streams.processor.internals.StreamThread.
>> runLoop(StreamThread.java:647)
>> > at
>> > org.apache.kafka.streams.processor.internals.StreamThread.
>> run(StreamThread.java:361)
>> > Caused by: org.apache.kafka.streams.errors.ProcessorStateException:
>> Error
>> > while executing flush from store session-id-start-events
>> > at
>> > org.apache.kafka.streams.state.internals.RocksDBStore.flushI
>> nternal(RocksDBStore.java:354)
>> > at
>> > org.apache.kafka.streams.state.internals.RocksDBStore.flush(
>> RocksDBStore.java:345)
>> > at
>> > org.apache.kafka.streams.state.internals.WrappedStateStore$A
>> bstractWrappedStateStore.flush(WrappedStateStore.java:80)
>> > at
>> > org.apache.kafka.streams.state.internals.WrappedStateStore$A
>> bstractWrappedStateStore.flush(WrappedStateStore.java:80)
>> > at
>> > org.apache.kafka.streams.state.internals.MeteredKeyValueStor
>> e$6.run(MeteredKeyValueStore.java:92)
>> > at
>> > org.apache.kafka.streams.processor.internals.StreamsMetricsI
>> mpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>> > at
>> > org.apache.kafka.streams.state.internals.MeteredKeyValueStor
>> e.flush(MeteredKeyValueStore.java:186)
>> > at
>> > org.apache.kafka.streams.processor.internals.ProcessorStateM
>> anager.flush(ProcessorStateManager.java:335)
>> > ... 6 more
>> > Caused by: org.rocksdb.RocksDBException: v
>> > at org.rocksdb.RocksDB.flush(Native Method)
>> > at org.rocksdb.RocksDB.flush(RocksDB.java:1642)
>> > at
>> > org.apache.kafka.streams.state.internals.RocksDBStore.flushI
>> nternal(RocksDBStore.java:352)
>> > ... 13 more
>> >
>> >
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message