kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Guozhang Wang (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6398) Non-aggregation KTable generation operator does not construct value getter correctly
Date Mon, 08 Jan 2018 22:31:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317240#comment-16317240

Guozhang Wang commented on KAFKA-6398:

[~bjamet@isi.nc] I can reproduce the issue you mentioned, and I have a theory on its root
cause, which is reflected in the updated description of this JIRA. My plan is to merge https://github.com/apache/kafka/pull/4384/files
first, which fixes a minor issue but not this ticket as a whole, and then will continue to
work on this ticket.

> Non-aggregation KTable generation operator does not construct value getter correctly
> ------------------------------------------------------------------------------------
>                 Key: KAFKA-6398
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6398
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions:, 1.0.0
>            Reporter: Matthias J. Sax
>            Assignee: Guozhang Wang
>            Priority: Critical
>              Labels: bug
> For any operator that generates a KTable, its {{valueGetterSupplier}} has three code
> 1. If the operator is a KTable source operator, using its materialized state store for
value getter (note that currently we always materialize on KTable source).
> 2. If the operator is an aggregation operator, then its generated KTable should always
be materialized so we just use its materialized state store.
> 3. Otherwise, we treat the value getter in a per-operator basis.
> For 3) above, what we SHOULD do is that, if the generated KTable is materialized, the
value getter would just rely on its materialized state store to get the value; otherwise we
just rely on the operator itself to define which parent's value getter to inherit and what
computational logic to apply on-the-fly to get the value. For example, for {{KTable#filter()}}
where the {{Materialized}} is not specified, in {{KTableFilterValueGetter}} we just get from
parent's value getter and then apply the filter on the fly; and in addition we should let
the future operators to be able to access its parent's materialized state store via {{connectProcessorAndStateStore}}.
> However, current code does not do this correctly: it 1) does not check if the result
KTable is materialized or not, but always try to use its parent's value getter, and 2) it
does not try to connect its parent's materialized store to the future operator. As a result,
these operators such as {{KTable#filter}}, {{KTable#mapValues}}, and {{KTable#join(KTable)}}
would result in TopologyException when building. The following is an example:
> ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
> Using a non-materialized KTable in a stream-table join fails:
> {noformat}
> final KTable filteredKTable = builder.table("table-topic").filter(...);
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> fails with
> {noformat}
> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building:
StateStore null is not added yet.
> 	at org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021)
> 	at org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949)
> 	at org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621)
> 	at org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577)
> 	at org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563)
> {noformat}
> Adding a store name is not sufficient as workaround but fails differently:
> {noformat}
> final KTable filteredKTable = builder.table("table-topic").filter(..., "STORE-NAME");
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> error:
> {noformat}
> org.apache.kafka.streams.errors.StreamsException: failed to initialize processor KSTREAM-JOIN-0000000005
> 	at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113)
> 	at org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339)
> 	at org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153)
> Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology
building: Processor KSTREAM-JOIN-0000000005 has no access to StateStore KTABLE-SOURCE-STATE-STORE-0000000000
> 	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69)
> 	at org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45)
> 	at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121)
> 	at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44)
> 	at org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:53)
> 	at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
> 	at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:111)
> {noformat}
> One can workaround by piping the result through a topic:
> {noformat}
> final KTable filteredKTable = builder.table("table-topic").filter(...).through("TOPIC");;
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> ------------------------------------------------------------------------------------------------------------
> Note that there is another minor orthogonal issue of {{KTable#filter}} itself that it
does not include its parent's queryable store name when itself is not materialized (see {{KTable#mapValues}}
for reference).

This message was sent by Atlassian JIRA

View raw message