kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Eno Thereska (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-5315) Streams exception w/ partially processed record corrupts state store
Date Wed, 24 May 2017 16:17:04 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16023155#comment-16023155
] 

Eno Thereska commented on KAFKA-5315:
-------------------------------------

[~mjsax] isn't this a fundamental problem that exactly-once and transactions are supposed
to solve? Falls into the general category of "produce to a bunch of topics + state store"
atomically. What am I missing?

> Streams exception w/ partially processed record corrupts state store
> --------------------------------------------------------------------
>
>                 Key: KAFKA-5315
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5315
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.1
>            Reporter: Mathieu Fenniak
>            Assignee: Matthias J. Sax
>
> When processing a topic record in a Kafka Streams KTable, the record is inserted into
the state store before the being forwarded to downstream processors, and may persist in the
state store even if downstream processing fails due to an exception.  The persisted state
store record may later affect any attempt to restart processing after the exception.
> Specific example series of events in a simple topology: a single KTable source, group
by a field in the value, aggregate that adds up another field, output to a topic --
> 1. A new record (record A) is received by the source KTable, and put in the KTable RocksDB
state store.
> 2. While processing record A, an exception happens preventing producing to Kafka. (eg,
a TimeoutException Failed to
> update metadata after 60000 ms).
> 3. The stream thread throws an unhandled exception and stops.
> 4. The state stores are closed and flushed.  Record A is now in the local state store.
> 5. The consumer group rebalances.
> 6. A different thread, in the same process, on the same host, picks up the task.
> 7. New thread initializes its state store for the KTable, but it's on the same host as
the original thread, so it still contains the k/v for record A.
> 8. New thread resumes consuming at the last committed offset, which is before record
A.
> 9. When processing record A, the new thread reads the value that was written to the state
store in step #1 by record A's key.
> 10. The repartition map receives a Change with both an oldValue and a
> newValue, and forwards a Change(null, v) and Change(v, null)
> 11. The aggregation ends up both subtracting and adding the value of record A, resulting
in an incorrect & persistent output.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message