kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Phil Derome (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-3902) Optimize KTable.filter() to reduce unnecessary traffic
Date Sun, 26 Jun 2016 14:28:33 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15350131#comment-15350131
] 

Phil Derome commented on KAFKA-3902:
------------------------------------

I am currently having a problem when porting Confluent UserRegionLambdaExample into the Kafka
build after converting to JDK 7. Problem is at run time with LongDeserializer in console consumer.
I am investigating and would say that the fix is not ready until this issue is addressed.

bin/kafka-console-consumer --topic LargeRegions --from-beginning \
> --zookeeper localhost:2181 --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
asia	Processed a total of 1 messages
[2016-06-26 10:22:22,126] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer
is not 8



> Optimize KTable.filter() to reduce unnecessary traffic
> ------------------------------------------------------
>
>                 Key: KAFKA-3902
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3902
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>              Labels: architecture, performance
>
> {{KTable.filter()}} operator is implemented in {{KTableFilter}}, and can be optimized
to reduce unnecessary data traffic to downstream operators. More specifically:
> 1. Some context: when a KTable participates in a downstream operators (e.g. if that operator
is an aggregation), then we need to materialize this KTable and send both its old value as
well as new value as a pair {old -> new} to the downstream operator. In practice it usually
needs to send the pair. 
> So let's discuss about them separately, take the following example source stream for
your KTable
> {{<a: 1>, <b: 2>, <a: 3> ...}}
> When the KTable needs to be materialized, it will transform the source messages into
the pairs of:
> {{<a: \{null -> 1\}>, <b: \{nul -> 2\}>, <a: \{1 -> 3\}>}}
> 2. If "send old value" is not enabled, then when the filter predicate returns false,
we MUST send a <key: null> to the downstream operator to indicate that this key is being
filtered in the table. Otherwise, for example if your filter is "value < 2", then the updated
value <a: 3> will just be filtered, resulting in incorrect semantics.
> If it returns true we should still send the original <key: value> to downstream
operators.
> 3. If "send old value" is enabled, then there are a couple of cases we can consider:
>     a. If old value is <key: null> and new value is <key: not-null>, and
the filter predicate return false for the new value, then in this case it is safe to optimize
and not returning anything to the downstream operator, since in this case we know there is
no value for the key previously anyways; otherwise we send the original pair.
>     b. If old value is <key: not-null> and new value is <key: null>, indicating
to delete this key, and the filter predicate return false for the old value, then in this
case it is safe to optimize and not returning anything to the downstream operator, since we
know that the old value has already been filtered in a previous message; otherwise we send
the original pair.
>     c. If both old and new values are not null, and:
>         1) predicate return true on both, send the original pair;
>         2) predicate return false on both, we can optimize and do not send anything;
>         3) predicate return true on old and false on new, send the key: \{old -> null\};
>         4) predicate return false on old and true on new, send the key: \{null ->
new\};



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message