Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6D601200B39 for ; Sat, 25 Jun 2016 07:05:18 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 60F81160A5A; Sat, 25 Jun 2016 05:05:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A8B7D160A58 for ; Sat, 25 Jun 2016 07:05:17 +0200 (CEST) Received: (qmail 94087 invoked by uid 500); 25 Jun 2016 05:05:16 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 94076 invoked by uid 99); 25 Jun 2016 05:05:16 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 25 Jun 2016 05:05:16 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 33AEE2C0451 for ; Sat, 25 Jun 2016 05:05:16 +0000 (UTC) Date: Sat, 25 Jun 2016 05:05:16 +0000 (UTC) From: "Guozhang Wang (JIRA)" To: dev@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Created] (KAFKA-3902) Optimize KTable.filter() to reduce unnecessary traffic MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Sat, 25 Jun 2016 05:05:18 -0000 Guozhang Wang created KAFKA-3902: ------------------------------------ Summary: Optimize KTable.filter() to reduce unnecessary traffic Key: KAFKA-3902 URL: https://issues.apache.org/jira/browse/KAFKA-3902 Project: Kafka Issue Type: Bug Components: streams Reporter: Guozhang Wang {{KTable.filter()}} operator is implemented in {{KTableFilter}}, and can be optimized to reduce unnecessary data traffic to downstream operators. More specifically: 1. Some context: when a KTable participates in a downstream operators (e.g. if that operator is an aggregation), then we need to materialize this KTable and send both its old value as well as new value as a pair {old -> new} to the downstream operator. In practice it usually needs to send the pair. So let's discuss about them separately, take the following example source stream for your KTable {{, , ...}} When the KTable needs to be materialized, it will transform the source messages into the pairs of: {{ 1}>, 2}>, 3}>}} 2. If "send old value" is not enabled, then when the filter predicate returns false, we MUST send a to the downstream operator to indicate that this key is being filtered in the table. Otherwise, for example if your filter is "value < 2", then the updated value will just be filtered, resulting in incorrect semantics. If it returns true we should still send the original to downstream operators. 3. If "send old value" is enabled, then there are a couple of cases we can consider: a. If old value is and new value is , and the filter predicate return false for the new value, then in this case it is safe to optimize and not returning anything to the downstream operator, since in this case we know there is no value for the key previously anyways; otherwise we send the original pair. b. If old value is and new value is , indicating to delete this key, and the filter predicate return false for the old value, then in this case it is safe to optimize and not returning anything to the downstream operator, since we know that the old value has already been filtered in a previous message; otherwise we send the original pair. c. If both old and new values are not null, and: 1) predicate return true on both, send the original pair; 2) predicate return false on both, we can optimize and do not send anything; 3) predicate return true on old and false on new, send the key: {old -> null}; 4) predicate return false on old and true on new, send the key: {null -> new}; -- This message was sent by Atlassian JIRA (v6.3.4#6332)