kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Damian Guy <damian....@gmail.com>
Subject Re: RocksDB flushing issue on 0.10.2 streams
Date Thu, 06 Jul 2017 11:43:45 GMT
Greg, what OS are you running on?
Are you able to reproduce this in a test at all?
For instance, based on what you described it would seem that i should be
able to start a streams app, wait for it to be up and running, run the
state dir cleanup, see it fail. However, i can't reproduce it.

On Wed, 5 Jul 2017 at 23:23 Damian Guy <damian.guy@gmail.com> wrote:

> Thanks Greg. I'll look into it more tomorrow. Just finding it difficult to
> reproduce in a test.
> Thanks for providing the sequence, gives me something to try and repo.
> Appreciated.
>
> Thanks,
> Damian
> On Wed, 5 Jul 2017 at 19:57, Greg Fodor <gfodor@gmail.com> wrote:
>
>> Also, the sequence of events is:
>>
>> - Job starts, rebalance happens, things run along smoothly.
>> - After 10 minutes (retrospectively) the cleanup task kicks on and removes
>> some directories
>> - Tasks immediately start failing when trying to flush their state stores
>>
>>
>>
>> On Wed, Jul 5, 2017 at 11:55 AM, Greg Fodor <gfodor@gmail.com> wrote:
>>
>> > The issue I am hitting is not the directory locking issues we've seen in
>> > the past. The issue seems to be, as you mentioned, that the state dir is
>> > getting deleted by the store cleanup process, but there are still tasks
>> > running that are trying to flush the state store. It seems more than a
>> > little scary given that right now it seems either a) there are tasks
>> > running that should have been re-assigned or b) the cleanup job is
>> removing
>> > state directories for currently running + assigned tasks (perhaps
>> during a
>> > rebalance there is a race condition?) I'm guessing there's probably a
>> more
>> > benign explanation, but that is what it looks like right now.
>> >
>> > On Wed, Jul 5, 2017 at 7:00 AM, Damian Guy <damian.guy@gmail.com>
>> wrote:
>> >
>> >> BTW - i'm trying to reproduce it, but not having much luck so far...
>> >>
>> >> On Wed, 5 Jul 2017 at 09:27 Damian Guy <damian.guy@gmail.com> wrote:
>> >>
>> >> > Thans for the updates Greg. There were some minor changes around
>> this in
>> >> > 0.11.0 to make it less likely to happen, but we've only ever seen the
>> >> > locking fail in the event of a rebalance. When everything is running
>> >> state
>> >> > dirs shouldn't be deleted if they are being used as the lock will
>> fail.
>> >> >
>> >> >
>> >> > On Wed, 5 Jul 2017 at 08:15 Greg Fodor <gfodor@gmail.com> wrote:
>> >> >
>> >> >> I can report that setting state.cleanup.delay.ms to a very large
>> value
>> >> >> (effectively disabling it) works around the issue. It seems that
the
>> >> state
>> >> >> store cleanup process can somehow get out ahead of another task
that
>> >> still
>> >> >> thinks it should be writing to the state store/flushing it. In
my
>> test
>> >> >> runs, this does not seem to be happening during a rebalancing event,
>> >> but
>> >> >> after the cluster is stable.
>> >> >>
>> >> >> On Tue, Jul 4, 2017 at 12:29 PM, Greg Fodor <gfodor@gmail.com>
>> wrote:
>> >> >>
>> >> >> > Upon another run, I see the same error occur during a rebalance,
>> so
>> >> >> either
>> >> >> > my log was showing a rebalance or there is a shared underlying
>> issue
>> >> >> with
>> >> >> > state stores.
>> >> >> >
>> >> >> > On Tue, Jul 4, 2017 at 11:35 AM, Greg Fodor <gfodor@gmail.com>
>> >> wrote:
>> >> >> >
>> >> >> >> Also, I am on 0.10.2.1, so poll interval was already set
to
>> >> MAX_VALUE.
>> >> >> >>
>> >> >> >> On Tue, Jul 4, 2017 at 11:28 AM, Greg Fodor <gfodor@gmail.com>
>> >> wrote:
>> >> >> >>
>> >> >> >>> I've nuked the nodes this happened on, but the job
had been
>> running
>> >> >> for
>> >> >> >>> about 5-10 minutes across 5 nodes before this happened.
Does the
>> >> log
>> >> >> show a
>> >> >> >>> rebalance was happening? It looks to me like the standby
task
>> was
>> >> just
>> >> >> >>> committing as part of normal operations.
>> >> >> >>>
>> >> >> >>> On Tue, Jul 4, 2017 at 7:40 AM, Damian Guy <
>> damian.guy@gmail.com>
>> >> >> wrote:
>> >> >> >>>
>> >> >> >>>> Hi Greg,
>> >> >> >>>>
>> >> >> >>>> Obviously a bit difficult to read the RocksDBException,
but my
>> >> guess
>> >> >> is
>> >> >> >>>> it
>> >> >> >>>> is because the state directory gets deleted right
before the
>> flush
>> >> >> >>>> happens:
>> >> >> >>>> 2017-07-04 10:54:46,829 [myid:] - INFO
>> >> >> [StreamThread-21:StateDirector
>> >> >> >>>> y@213]
>> >> >> >>>> - Deleting obsolete state directory 0_10 for task
0_10
>> >> >> >>>>
>> >> >> >>>> Yes it looks like it is possibly the same bug
as KAFKA-5070.
>> >> >> >>>>
>> >> >> >>>> It looks like your application is constantly rebalancing
during
>> >> store
>> >> >> >>>> intialization, which may be the reason this bug
comes about
>> (there
>> >> >> is a
>> >> >> >>>> chance that the state dir lock is released so
when the thread
>> >> tries
>> >> >> to
>> >> >> >>>> removes the stale state directory it is able to
get the lock).
>> You
>> >> >> >>>> probably
>> >> >> >>>> want to configure `max.poll.interval.ms` to be
a reasonably
>> large
>> >> >> >>>> value ( i
>> >> >> >>>> think we default to Integer.MAX_VALUE in 0.10.2.1).
You can
>> also
>> >> try
>> >> >> >>>> setting `state.cleanup.delay.ms` to a higher value
(default
>> is 10
>> >> >> >>>> minutes),
>> >> >> >>>> to try and avoid it happening during a rebalance
(I know this
>> >> isn't a
>> >> >> >>>> fix,
>> >> >> >>>> but will make it less likely to happen).
>> >> >> >>>>
>> >> >> >>>> Thanks,
>> >> >> >>>> Damian
>> >> >> >>>>
>> >> >> >>>> On Tue, 4 Jul 2017 at 12:43 Greg Fodor <gfodor@gmail.com>
>> wrote:
>> >> >> >>>>
>> >> >> >>>> > Hi all, we are working on upgrading our jobs
from 0.10.0 to
>> use
>> >> >> Kafka
>> >> >> >>>> > Streams 0.10.2.1 and are hitting a problem.
We have an ETL
>> job
>> >> that
>> >> >> >>>> has 4
>> >> >> >>>> > state stores and runs across a few hundred
partitions, and as
>> >> part
>> >> >> of
>> >> >> >>>> load
>> >> >> >>>> > testing the job we are trying to reload our
data out of kafka
>> >> into
>> >> >> a
>> >> >> >>>> test
>> >> >> >>>> > db. The result is we are able to load about
4M tuples and
>> then
>> >> this
>> >> >> >>>> error
>> >> >> >>>> > pops up on all of the stream nodes simultaneously.
There are
>> 4
>> >> >> rocksdb
>> >> >> >>>> > stores in question and there are lots of
these errors which
>> >> takes
>> >> >> it
>> >> >> >>>> down.
>> >> >> >>>> > This bug *does* not seem to occur on 0.10.1.
>> >> >> >>>> >
>> >> >> >>>> > A similar error was mentioned here:
>> >> >> >>>> > https://issues.apache.org/jira/browse/KAFKA-5070
>> >> >> >>>> >
>> >> >> >>>> > Full log attached.
>> >> >> >>>> >
>> >> >> >>>> > org.apache.kafka.streams.errors.ProcessorStateException:
task
>> >> >> [0_10]
>> >> >> >>>> > Failed to flush state store session-id-start-events
>> >> >> >>>> > at
>> >> >> >>>> > org.apache.kafka.streams.processor.internals.ProcessorStateM
>> >> >> >>>> anager.flush(ProcessorStateManager.java:337)
>> >> >> >>>> > at
>> >> >> >>>> > org.apache.kafka.streams.processor.internals.StandbyTask.com
>> >> >> >>>> mit(StandbyTask.java:94)
>> >> >> >>>> > at
>> >> >> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.co
>> >> >> >>>> mmitOne(StreamThread.java:807)
>> >> >> >>>> > at
>> >> >> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.co
>> >> >> >>>> mmitAll(StreamThread.java:797)
>> >> >> >>>> > at
>> >> >> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.ma
>> >> >> >>>> ybeCommit(StreamThread.java:769)
>> >> >> >>>> > at
>> >> >> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.ru
>> >> >> >>>> nLoop(StreamThread.java:647)
>> >> >> >>>> > at
>> >> >> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.ru
>> >> >> >>>> n(StreamThread.java:361)
>> >> >> >>>> > Caused by: org.apache.kafka.streams.error
>> >> s.ProcessorStateException:
>> >> >> >>>> Error
>> >> >> >>>> > while executing flush from store session-id-start-events
>> >> >> >>>> > at
>> >> >> >>>> > org.apache.kafka.streams.state.internals.RocksDBStore.flushI
>> >> >> >>>> nternal(RocksDBStore.java:354)
>> >> >> >>>> > at
>> >> >> >>>> > org.apache.kafka.streams.state.internals.RocksDBStore.flush(
>> >> >> >>>> RocksDBStore.java:345)
>> >> >> >>>> > at
>> >> >> >>>> > org.apache.kafka.streams.state.internals.WrappedStateStore$A
>> >> >> >>>> bstractWrappedStateStore.flush(WrappedStateStore.java:80)
>> >> >> >>>> > at
>> >> >> >>>> > org.apache.kafka.streams.state.internals.WrappedStateStore$A
>> >> >> >>>> bstractWrappedStateStore.flush(WrappedStateStore.java:80)
>> >> >> >>>> > at
>> >> >> >>>> > org.apache.kafka.streams.state.internals.MeteredKeyValueStor
>> >> >> >>>> e$6.run(MeteredKeyValueStore.java:92)
>> >> >> >>>> > at
>> >> >> >>>> > org.apache.kafka.streams.processor.internals.StreamsMetricsI
>> >> >> >>>> mpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>> >> >> >>>> > at
>> >> >> >>>> > org.apache.kafka.streams.state.internals.MeteredKeyValueStor
>> >> >> >>>> e.flush(MeteredKeyValueStore.java:186)
>> >> >> >>>> > at
>> >> >> >>>> > org.apache.kafka.streams.processor.internals.ProcessorStateM
>> >> >> >>>> anager.flush(ProcessorStateManager.java:335)
>> >> >> >>>> > ... 6 more
>> >> >> >>>> > Caused by: org.rocksdb.RocksDBException:
v
>> >> >> >>>> > at org.rocksdb.RocksDB.flush(Native Method)
>> >> >> >>>> > at org.rocksdb.RocksDB.flush(RocksDB.java:1642)
>> >> >> >>>> > at
>> >> >> >>>> > org.apache.kafka.streams.state.internals.RocksDBStore.flushI
>> >> >> >>>> nternal(RocksDBStore.java:352)
>> >> >> >>>> > ... 13 more
>> >> >> >>>> >
>> >> >> >>>> >
>> >> >> >>>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>
>> >> >> >
>> >> >>
>> >> >
>> >>
>> >
>> >
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message