kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (KAFKA-6269) KTable state restore fails after rebalance
Date Thu, 30 Nov 2017 01:56:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16272013#comment-16272013
] 

Matthias J. Sax edited comment on KAFKA-6269 at 11/30/17 1:55 AM:
------------------------------------------------------------------

About the workaround. You could mask {{null}} with a dummy value: {{stream.mapValue(/*replace
null with custom NULL*).groupByKey.aggregate()}}.


was (Author: mjsax):
About the workaround. You could mask {{null}} with a dummy value: `stream.mapValue(/*replace
null with custom NULL*).groupByKey.aggregate()}}.

> KTable state restore fails after rebalance
> ------------------------------------------
>
>                 Key: KAFKA-6269
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6269
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Andreas Schroeder
>            Priority: Blocker
>             Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, valuemapper, and joiner
code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>                           sourceTopic: String,
>                           existsTopic: String,
>                           valueSerde: Serde[V],
>                           valueMapper: ValueMapper[String, V]): KTable[String, V] = {
>   val stream: KStream[String, String] = builder.stream[String, String](sourceTopic)
>   val transformed: KStream[String, V] = stream.mapValues(valueMapper)
>   transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
>   val inMemoryStoreName = s"$existsTopic-persisted"
>   val materialized = Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
>       .withKeySerde(Serdes.String())
>       .withValueSerde(valueSerde)
>       .withLoggingDisabled()
>   builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if (value !=
null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
>   buildTable(builder,
>              "entity-B",
>              "entity-B-exists",
>              EntityBInfoSerde,
>              ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
>   buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => EntityDiff.fromJoin(a,
b)
> val materialized = Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(EntityDiffSerde)
>   .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 partitions
so that there is a total of 4 x 30 = 120 partitions to consume. The initial launch of the
processor works fine, but when killing one processor and letting him re-join the stream threads
leads to some faulty behaviour.
> Fist, the total number of assigned partitions over all processor machines is larger than
120 (sometimes 157, sometimes just 132), so the partition / task assignment seems to assign
the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly with the
error message of 'Detected a task that got migrated to another thread.' We gave the processor
half an hour to recover; usually, rebuilding the KTable states take around 20 seconds (with
Kafka 0.11.0.1).
> Here are the details of the errors we see:
> stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got migrated to
another thread. This implies that this thread missed a rebalance and dropped out of the consumer
group. Trying to rejoin the consumer group now.
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of entity-B-exists-0
should not change while restoring: old end offset 4750539, current offset 4751388
> > StreamsTask taskId: 1_0
> > > 	ProcessorTopology:
> > 		KSTREAM-SOURCE-0000000008:
> > 			topics:		[entity-A-exists]
> > 			children:	[KTABLE-SOURCE-0000000009]
> > 		KTABLE-SOURCE-0000000009:
> > 			states:		[entity-A-exists-persisted]
> > 			children:	[KTABLE-JOINTHIS-0000000011]
> > 		KTABLE-JOINTHIS-0000000011:
> > 			states:		[entity-B-exists-persisted]
> > 			children:	[KTABLE-MERGE-0000000010]
> > 		KTABLE-MERGE-0000000010:
> > 			states:		[entity-A-joined-with-entity-B]
> > 		KSTREAM-SOURCE-0000000003:
> > 			topics:		[entity-B-exists]
> > 			children:	[KTABLE-SOURCE-0000000004]
> > 		KTABLE-SOURCE-0000000004:
> > 			states:		[entity-B-exists-persisted]
> > 			children:	[KTABLE-JOINOTHER-0000000012]
> > 		KTABLE-JOINOTHER-0000000012:
> > 			states:		[entity-A-exists-persisted]
> > 			children:	[KTABLE-MERGE-0000000010]
> > 		KTABLE-MERGE-0000000010:
> > 			states:		[entity-A-joined-with-entity-B]
> > Partitions [entity-A-exists-0, entity-B-exists-0]
> 	at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restorePartition(StoreChangelogReader.java:242)
> 	at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83)
> 	at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:263)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:803)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
> {code}
> That one surprises me: the KTable state store entity-B-exists-persisted is rebuilt from
entity-B-exists that of course can change while the rebuild is happening, since it the topic
entity-B-exists is fed by another stream thread.
> Another one, very similar:
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of entity-A-exists-24
should not change while restoring: old end offset 6483978, current offset 6485108
> > StreamsTask taskId: 1_24
> > > 	ProcessorTopology:
> > 		KSTREAM-SOURCE-0000000008:
> > 			topics:		[entity-A-exists]
> > 			children:	[KTABLE-SOURCE-0000000009]
> > 		KTABLE-SOURCE-0000000009:
> > 			states:		[entity-A-exists-persisted]
> > 			children:	[KTABLE-JOINTHIS-0000000011]
> > 		KTABLE-JOINTHIS-0000000011:
> > 			states:		[entity-B-exists-persisted]
> > 			children:	[KTABLE-MERGE-0000000010]
> > 		KTABLE-MERGE-0000000010:
> > 			states:		[entity-A-joined-with-entity-B]
> > 		KSTREAM-SOURCE-0000000003:
> > 			topics:		[entity-B-exists]
> > 			children:	[KTABLE-SOURCE-0000000004]
> > 		KTABLE-SOURCE-0000000004:
> > 			states:		[entity-B-exists-persisted]
> > 			children:	[KTABLE-JOINOTHER-0000000012]
> > 		KTABLE-JOINOTHER-0000000012:
> > 			states:		[entity-A-exists-persisted]
> > 			children:	[KTABLE-MERGE-0000000010]
> > 		KTABLE-MERGE-0000000010:
> > 			states:		[entity-A-joined-with-entity-B]
> > Partitions [entity-A-exists-24, entity-B-exists-24]
> 	at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restorePartition(StoreChangelogReader.java:242)
> 	at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83)
> 	at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:263)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:803)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
> {code}
> Again, the topic entity-A-exists is fed by another stream thread.
> We saw around 60000 such errors per minute, as the stream threads continuously try to
recover and fail.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message