kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mathieu Fenniak <mathieu.fenn...@replicon.com>
Subject Suppressing redundant KTable forwards
Date Sun, 04 Dec 2016 04:13:55 GMT
Hey all,

I'd like to contribute a new KTable API that would allow for the
suppression of redundant KTable forwards, and I'd like to solicit feedback
before I put together a patch.

A typical use-case of this API would be that you're using mapValues to
pluck a subset of data out of a topic, but you'd like changes to the record
value that don't modify the output of mapValues to not cause output that
trigger expensive and redundant recalculations.

For example, topic "user" contains key:1, value:{"firstName": "Jack",
"lastName": "Brown"}.  builder.topic("user").mapValues((user) ->
user.get("lastName"))  will create a KTable that would forward updates from
the user topic even if lastName never changed.

My proposed API would be to add a filterRedundant method to KTable; one
override takes a Comparator<V> to provide a custom comparison for
evaluating whether a change is redundant, and one parameterless override
would use a comparator backed by the object's equals() method.

    /**
     * Creates a new instance of {@link KTable} that filters out redundant
updates and prevents "non-updates" from
     * propagating to further operations on the returned table.  A
redundant update onewhere the same value is provided
     * more than once for a given key.  Object.equals is used to compare
whether a subsequent update has the same value.

     * @return a {@link KTable} that contains the same values as this
table, but suppresses redundant updates
     */
    KTable<K, V> filterRedundant();

    /**
     * Creates a new instance of {@link KTable} that filters out redundant
updates and prevents "non-updates" from
     * propagating to further operations on the returned table.  A
redundant update onewhere the same value is provided
     * more than once for a given key.  A user-provided comparator is used
to compare whether a subsequent update has
     * the same value.

     * @return a {@link KTable} that contains the same values as this
table, but suppresses redundant updates
     */
    KTable<K, V> filterRedundant(Comparator<V> comparator);

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message