kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance
Date Fri, 24 Nov 2017 04:31:01 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264921#comment-16264921
] 

Matthias J. Sax commented on KAFKA-6269:
----------------------------------------

Thanks for reporting this. If you get the exception {{Log end offset of entity-B-exists-0
should not change while restoring: old end offset 4750539, current offset 4751388}}, this
means that a thread started to recover a state from a changelog topic and the endOffset moved
during the process -- this should never happen (only if the task was migrated to another thread
and this other thread writes into the changelog topic). If the state is not migrated, the
thread that restores would be the only one that is "allowed" to write into the changelog;
but as long it restores it does not write.

What I could think of is a bug, that relates to an optimization we do: As you read KTables
from source topics, there is no additional changelog topic as the source topic can be used
to recreate the state (the error message shows that the source topic is used for store recovery).
Thus, a upstream processor that write to the source topic can of course append data to this
topic -- we actually have a guard for this case, but we change some code here, maybe introducing
a bug that this guard does not work properly anymore: thus, the restoring thread "thinks"
it reads a changelog topic (while it does read a source topic) for recovery and fails even
if it shouldn't. I'll try to reproduce this locally and cycle back to you.

> KTable state restore fails after rebalance
> ------------------------------------------
>
>                 Key: KAFKA-6269
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6269
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Andreas Schroeder
>             Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, valuemapper, and joiner
code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>                           sourceTopic: String,
>                           existsTopic: String,
>                           valueSerde: Serde[V],
>                           valueMapper: ValueMapper[String, V]): KTable[String, V] = {
>   val stream: KStream[String, String] = builder.stream[String, String](sourceTopic)
>   val transformed: KStream[String, V] = stream.mapValues(valueMapper)
>   transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
>   val inMemoryStoreName = s"$existsTopic-persisted"
>   val materialized = Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
>       .withKeySerde(Serdes.String())
>       .withValueSerde(valueSerde)
>       .withLoggingDisabled()
>   builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if (value !=
null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
>   buildTable(builder,
>              "entity-B",
>              "entity-B-exists",
>              EntityBInfoSerde,
>              ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
>   buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => EntityDiff.fromJoin(a,
b)
> val materialized = Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(EntityDiffSerde)
>   .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 partitions
so that there is a total of 4 x 30 = 120 partitions to consume. The initial launch of the
processor works fine, but when killing one processor and letting him re-join the stream threads
leads to some faulty behaviour.
> Fist, the total number of assigned partitions over all processor machines is larger than
120 (sometimes 157, sometimes just 132), so the partition / task assignment seems to assign
the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly with the
error message of 'Detected a task that got migrated to another thread.' We gave the processor
half an hour to recover; usually, rebuilding the KTable states take around 20 seconds (with
Kafka 0.11.0.1).
> Here are the details of the errors we see:
> stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got migrated to
another thread. This implies that this thread missed a rebalance and dropped out of the consumer
group. Trying to rejoin the consumer group now.
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of entity-B-exists-0
should not change while restoring: old end offset 4750539, current offset 4751388
> > StreamsTask taskId: 1_0
> > > 	ProcessorTopology:
> > 		KSTREAM-SOURCE-0000000008:
> > 			topics:		[entity-A-exists]
> > 			children:	[KTABLE-SOURCE-0000000009]
> > 		KTABLE-SOURCE-0000000009:
> > 			states:		[entity-A-exists-persisted]
> > 			children:	[KTABLE-JOINTHIS-0000000011]
> > 		KTABLE-JOINTHIS-0000000011:
> > 			states:		[entity-B-exists-persisted]
> > 			children:	[KTABLE-MERGE-0000000010]
> > 		KTABLE-MERGE-0000000010:
> > 			states:		[entity-A-joined-with-entity-B]
> > 		KSTREAM-SOURCE-0000000003:
> > 			topics:		[entity-B-exists]
> > 			children:	[KTABLE-SOURCE-0000000004]
> > 		KTABLE-SOURCE-0000000004:
> > 			states:		[entity-B-exists-persisted]
> > 			children:	[KTABLE-JOINOTHER-0000000012]
> > 		KTABLE-JOINOTHER-0000000012:
> > 			states:		[entity-A-exists-persisted]
> > 			children:	[KTABLE-MERGE-0000000010]
> > 		KTABLE-MERGE-0000000010:
> > 			states:		[entity-A-joined-with-entity-B]
> > Partitions [entity-A-exists-0, entity-B-exists-0]
> 	at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restorePartition(StoreChangelogReader.java:242)
> 	at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83)
> 	at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:263)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:803)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
> {code}
> That one surprises me: the KTable state store entity-B-exists-persisted is rebuilt from
entity-B-exists that of course can change while the rebuild is happening, since it the topic
entity-B-exists is fed by another stream thread.
> Another one, very similar:
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of entity-A-exists-24
should not change while restoring: old end offset 6483978, current offset 6485108
> > StreamsTask taskId: 1_24
> > > 	ProcessorTopology:
> > 		KSTREAM-SOURCE-0000000008:
> > 			topics:		[entity-A-exists]
> > 			children:	[KTABLE-SOURCE-0000000009]
> > 		KTABLE-SOURCE-0000000009:
> > 			states:		[entity-A-exists-persisted]
> > 			children:	[KTABLE-JOINTHIS-0000000011]
> > 		KTABLE-JOINTHIS-0000000011:
> > 			states:		[entity-B-exists-persisted]
> > 			children:	[KTABLE-MERGE-0000000010]
> > 		KTABLE-MERGE-0000000010:
> > 			states:		[entity-A-joined-with-entity-B]
> > 		KSTREAM-SOURCE-0000000003:
> > 			topics:		[entity-B-exists]
> > 			children:	[KTABLE-SOURCE-0000000004]
> > 		KTABLE-SOURCE-0000000004:
> > 			states:		[entity-B-exists-persisted]
> > 			children:	[KTABLE-JOINOTHER-0000000012]
> > 		KTABLE-JOINOTHER-0000000012:
> > 			states:		[entity-A-exists-persisted]
> > 			children:	[KTABLE-MERGE-0000000010]
> > 		KTABLE-MERGE-0000000010:
> > 			states:		[entity-A-joined-with-entity-B]
> > Partitions [entity-A-exists-24, entity-B-exists-24]
> 	at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restorePartition(StoreChangelogReader.java:242)
> 	at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83)
> 	at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:263)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:803)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
> {code}
> Again, the topic entity-A-exists is fed by another stream thread.
> We saw around 60000 such errors per minute, as the stream threads continuously try to
recover and fail.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message