kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Guozhang Wang (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6437) Streams does not warn about missing input topics, but hangs
Date Wed, 28 Mar 2018 00:03:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416494#comment-16416494

Guozhang Wang commented on KAFKA-6437:

This is a interesting reported issue in KAFKA-6720 that for join-involved topics, if it does
not exist yet a exception will be thrown. I think it is not a complete duplicate of this ticket,
and I'd like to summarize the "inconsistent" behavior that we are facing today:

1) For join operation from user topics directly (i.e. no reshuffling added as Streams assumes
input topics already partitioned by key), we'd require user topics pre-exist; and if not,
we throw TopologyBuilderException.
2) For join operation from repartition topics, since they are note available at assignment
phase we "assume" the repartition topics will be created and become available, hence we do
not check if the source topics are available. When the source topic is missing, and hence
no data will be send to the repartition topics at all, Streams will hang (this is what this
JIRA reported).
3) For stateless operations, if a source topic was missing, Streams will continue but generate
a warning.

So I think the actual fix should be in two folds:

1) We can [collect all external topic's num.partition|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L425-L437]
at the very beginning of the assign() phase, and log a warning entry if some of the topic's
metadata cannot be found.

2) In step one we do not need to [query the metadata|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L341]
again but we can get directly from the collected available num.partitions map.

3) The finally in ensureCopartitioning, if the metadata cannot be found we skip the [checking
co-partition phase|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L665]
but log another warning that "since the topic is not found, we will skip the co-partition
validation .."

> Streams does not warn about missing input topics, but hangs
> -----------------------------------------------------------
>                 Key: KAFKA-6437
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6437
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 1.0.0
>         Environment: Single client on single node broker
>            Reporter: Chris Schwarzfischer
>            Assignee: Mariam John
>            Priority: Minor
>              Labels: newbie
> *Case*
> Streams application with two input topics being used for a left join.
> When the left side topic is missing upon starting the streams application, it hangs "in
the middle" of the topology (at …00009, see below). Only parts of the intermediate topics
are created (up to …00009)
> When the missing input topic is created, the streams application resumes processing.
> {noformat}
> Topology:
> StreamsTask taskId: 2_0
> 	ProcessorTopology:
> 		KSTREAM-SOURCE-0000000011:
> 			topics:		[mystreams_app-KTABLE-AGGREGATE-STATE-STORE-0000000009-repartition]
> 			children:	[KTABLE-AGGREGATE-0000000012]
> 		KTABLE-AGGREGATE-0000000012:
> 			states:		[KTABLE-AGGREGATE-STATE-STORE-0000000009]
> 			children:	[KTABLE-TOSTREAM-0000000020]
> 		KTABLE-TOSTREAM-0000000020:
> 			children:	[KSTREAM-SINK-0000000021]
> 		KSTREAM-SINK-0000000021:
> 			topic:		data_udr_month_customer_aggregration
> 		KSTREAM-SOURCE-0000000017:
> 			topics:		[mystreams_app-KSTREAM-MAP-0000000014-repartition]
> 			children:	[KSTREAM-LEFTJOIN-0000000018]
> 		KSTREAM-LEFTJOIN-0000000018:
> 			states:		[KTABLE-AGGREGATE-STATE-STORE-0000000009]
> 			children:	[KSTREAM-SINK-0000000019]
> 		KSTREAM-SINK-0000000019:
> 			topic:		data_UDR_joined
> Partitions [mystreams_app-KSTREAM-MAP-0000000014-repartition-0, mystreams_app-KTABLE-AGGREGATE-STATE-STORE-0000000009-repartition-0]
> {noformat}
> *Why this matters*
> The applications does quite a lot of preprocessing before joining with the missing input
topic. This preprocessing won't happen without the topic, creating a huge backlog of data.
> *Fix*
> Issue an `warn` or `error` level message at start to inform about the missing topic and
it's consequences.

This message was sent by Atlassian JIRA

View raw message