kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Damian Guy <damian....@gmail.com>
Subject Re: RocksDB flushing issue on 0.10.2 streams
Date Wed, 05 Jul 2017 08:27:29 GMT
Thans for the updates Greg. There were some minor changes around this in
0.11.0 to make it less likely to happen, but we've only ever seen the
locking fail in the event of a rebalance. When everything is running state
dirs shouldn't be deleted if they are being used as the lock will fail.


On Wed, 5 Jul 2017 at 08:15 Greg Fodor <gfodor@gmail.com> wrote:

> I can report that setting state.cleanup.delay.ms to a very large value
> (effectively disabling it) works around the issue. It seems that the state
> store cleanup process can somehow get out ahead of another task that still
> thinks it should be writing to the state store/flushing it. In my test
> runs, this does not seem to be happening during a rebalancing event, but
> after the cluster is stable.
>
> On Tue, Jul 4, 2017 at 12:29 PM, Greg Fodor <gfodor@gmail.com> wrote:
>
> > Upon another run, I see the same error occur during a rebalance, so
> either
> > my log was showing a rebalance or there is a shared underlying issue with
> > state stores.
> >
> > On Tue, Jul 4, 2017 at 11:35 AM, Greg Fodor <gfodor@gmail.com> wrote:
> >
> >> Also, I am on 0.10.2.1, so poll interval was already set to MAX_VALUE.
> >>
> >> On Tue, Jul 4, 2017 at 11:28 AM, Greg Fodor <gfodor@gmail.com> wrote:
> >>
> >>> I've nuked the nodes this happened on, but the job had been running for
> >>> about 5-10 minutes across 5 nodes before this happened. Does the log
> show a
> >>> rebalance was happening? It looks to me like the standby task was just
> >>> committing as part of normal operations.
> >>>
> >>> On Tue, Jul 4, 2017 at 7:40 AM, Damian Guy <damian.guy@gmail.com>
> wrote:
> >>>
> >>>> Hi Greg,
> >>>>
> >>>> Obviously a bit difficult to read the RocksDBException, but my guess
> is
> >>>> it
> >>>> is because the state directory gets deleted right before the flush
> >>>> happens:
> >>>> 2017-07-04 10:54:46,829 [myid:] - INFO  [StreamThread-21:StateDirector
> >>>> y@213]
> >>>> - Deleting obsolete state directory 0_10 for task 0_10
> >>>>
> >>>> Yes it looks like it is possibly the same bug as KAFKA-5070.
> >>>>
> >>>> It looks like your application is constantly rebalancing during store
> >>>> intialization, which may be the reason this bug comes about (there is
> a
> >>>> chance that the state dir lock is released so when the thread tries
to
> >>>> removes the stale state directory it is able to get the lock). You
> >>>> probably
> >>>> want to configure `max.poll.interval.ms` to be a reasonably large
> >>>> value ( i
> >>>> think we default to Integer.MAX_VALUE in 0.10.2.1). You can also try
> >>>> setting `state.cleanup.delay.ms` to a higher value (default is 10
> >>>> minutes),
> >>>> to try and avoid it happening during a rebalance (I know this isn't
a
> >>>> fix,
> >>>> but will make it less likely to happen).
> >>>>
> >>>> Thanks,
> >>>> Damian
> >>>>
> >>>> On Tue, 4 Jul 2017 at 12:43 Greg Fodor <gfodor@gmail.com> wrote:
> >>>>
> >>>> > Hi all, we are working on upgrading our jobs from 0.10.0 to use
> Kafka
> >>>> > Streams 0.10.2.1 and are hitting a problem. We have an ETL job
that
> >>>> has 4
> >>>> > state stores and runs across a few hundred partitions, and as part
> of
> >>>> load
> >>>> > testing the job we are trying to reload our data out of kafka into
a
> >>>> test
> >>>> > db. The result is we are able to load about 4M tuples and then
this
> >>>> error
> >>>> > pops up on all of the stream nodes simultaneously. There are 4
> rocksdb
> >>>> > stores in question and there are lots of these errors which takes
it
> >>>> down.
> >>>> > This bug *does* not seem to occur on 0.10.1.
> >>>> >
> >>>> > A similar error was mentioned here:
> >>>> > https://issues.apache.org/jira/browse/KAFKA-5070
> >>>> >
> >>>> > Full log attached.
> >>>> >
> >>>> > org.apache.kafka.streams.errors.ProcessorStateException: task [0_10]
> >>>> > Failed to flush state store session-id-start-events
> >>>> > at
> >>>> > org.apache.kafka.streams.processor.internals.ProcessorStateM
> >>>> anager.flush(ProcessorStateManager.java:337)
> >>>> > at
> >>>> > org.apache.kafka.streams.processor.internals.StandbyTask.com
> >>>> mit(StandbyTask.java:94)
> >>>> > at
> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.co
> >>>> mmitOne(StreamThread.java:807)
> >>>> > at
> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.co
> >>>> mmitAll(StreamThread.java:797)
> >>>> > at
> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.ma
> >>>> ybeCommit(StreamThread.java:769)
> >>>> > at
> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.ru
> >>>> nLoop(StreamThread.java:647)
> >>>> > at
> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.ru
> >>>> n(StreamThread.java:361)
> >>>> > Caused by: org.apache.kafka.streams.errors.ProcessorStateException:
> >>>> Error
> >>>> > while executing flush from store session-id-start-events
> >>>> > at
> >>>> > org.apache.kafka.streams.state.internals.RocksDBStore.flushI
> >>>> nternal(RocksDBStore.java:354)
> >>>> > at
> >>>> > org.apache.kafka.streams.state.internals.RocksDBStore.flush(
> >>>> RocksDBStore.java:345)
> >>>> > at
> >>>> > org.apache.kafka.streams.state.internals.WrappedStateStore$A
> >>>> bstractWrappedStateStore.flush(WrappedStateStore.java:80)
> >>>> > at
> >>>> > org.apache.kafka.streams.state.internals.WrappedStateStore$A
> >>>> bstractWrappedStateStore.flush(WrappedStateStore.java:80)
> >>>> > at
> >>>> > org.apache.kafka.streams.state.internals.MeteredKeyValueStor
> >>>> e$6.run(MeteredKeyValueStore.java:92)
> >>>> > at
> >>>> > org.apache.kafka.streams.processor.internals.StreamsMetricsI
> >>>> mpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> >>>> > at
> >>>> > org.apache.kafka.streams.state.internals.MeteredKeyValueStor
> >>>> e.flush(MeteredKeyValueStore.java:186)
> >>>> > at
> >>>> > org.apache.kafka.streams.processor.internals.ProcessorStateM
> >>>> anager.flush(ProcessorStateManager.java:335)
> >>>> > ... 6 more
> >>>> > Caused by: org.rocksdb.RocksDBException: v
> >>>> > at org.rocksdb.RocksDB.flush(Native Method)
> >>>> > at org.rocksdb.RocksDB.flush(RocksDB.java:1642)
> >>>> > at
> >>>> > org.apache.kafka.streams.state.internals.RocksDBStore.flushI
> >>>> nternal(RocksDBStore.java:352)
> >>>> > ... 13 more
> >>>> >
> >>>> >
> >>>>
> >>>
> >>>
> >>
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message