kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Andy Coates (Jira)" <j...@apache.org>
Subject [jira] [Updated] (KAFKA-10494) Streams: enableSendingOldValues should not call parent if node is itself materialized
Date Mon, 12 Oct 2020 17:12:00 GMT

     [ https://issues.apache.org/jira/browse/KAFKA-10494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel

Andy Coates updated KAFKA-10494:
    Affects Version/s: 2.7.0

> Streams: enableSendingOldValues should not call parent if node is itself materialized
> -------------------------------------------------------------------------------------
>                 Key: KAFKA-10494
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10494
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 2.7.0
>            Reporter: Andy Coates
>            Assignee: Andy Coates
>            Priority: Blocker
>             Fix For: 2.7.0
> Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is unnecessarily
calling `enableSendingOldValues` on the parent, even when the processor itself is materialized.
This can force the parent table to be materialized unnecessarily.
> For example:
> {code:java}
> StreamsBuilder builder = new StreamsBuilder();builder
>      .table("t1", Consumed.of(...))
>      .filter(predicate, Materialized.as("t2"))
>      .<downStreamOps>
> {code}
> If `downStreamOps` result in a call to `enableSendingOldValues` on the table returned
by the `filter` call, i.e. `t2`, then it will result in `t1` being materialized unnecessarily.
> This ticket was raised off the back of [comments in a PR|#discussion_r490152263]] while
working on KAFKA-10077.
> A good test that highlights this would be to add this to `KTableFilterTest`:
> {code:java}
> @Test
>  public void shouldEnableSendOldValuesIfSourceTableNotMaterializedButFinalTableIsEvenIfNotForcedToMaterialize()
>    final StreamsBuilder builder = new StreamsBuilder();
>    final String topic1 = "topic1";
>   final KTableImpl<String, Integer, Integer> table1 =
>       (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
>    final KTableImpl<String, Integer, Integer> table2 =
>       (KTableImpl<String, Integer, Integer>) table1.filter(predicate, Materialized.as("store2"));
>   table2.enableSendingOldValues(false);
>   doTestSendingOldValue(builder, table1, table2, topic1);
>  }
> {code}
> Though this problem is not restricted to the filter call. Other processor suppliers suffer
from the same issue.
> In addition, once [https://github.com/apache/kafka/pull/9156] is merged,  if you call {{enableSendingOldValues}} without
forcing materialization on a table that is itself materialized, but who's upstream is not.
In such a situation, the table will _not_ enable sending old values, but should.

This message was sent by Atlassian Jira

View raw message