Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8E583200B38 for ; Fri, 24 Jun 2016 00:13:51 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8CE62160A68; Thu, 23 Jun 2016 22:13:51 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CED36160A59 for ; Fri, 24 Jun 2016 00:13:50 +0200 (CEST) Received: (qmail 18386 invoked by uid 500); 23 Jun 2016 22:13:47 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 18374 invoked by uid 99); 23 Jun 2016 22:13:46 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Jun 2016 22:13:46 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 45C89C00AD for ; Thu, 23 Jun 2016 22:13:46 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.179 X-Spam-Level: * X-Spam-Status: No, score=1.179 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id vF70l5k093Rm for ; Thu, 23 Jun 2016 22:13:44 +0000 (UTC) Received: from mail-it0-f53.google.com (mail-it0-f53.google.com [209.85.214.53]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with ESMTPS id B298A5F3FF for ; Thu, 23 Jun 2016 22:13:43 +0000 (UTC) Received: by mail-it0-f53.google.com with SMTP id f6so1276879ith.0 for ; Thu, 23 Jun 2016 15:13:43 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=DQSU8rolozu7lSQWDAp1GcgcLkgu4v1NXOKHxL43rr4=; b=beYwOu8J7Zu9+x214rmChmIt5+G1MtBFZtpcJ5vv+tIFIdyvm0I+wI95mJiAhhXbA9 kWEbom0tlZG412LjmsHMVNMzn+Wuy6VkZjel0XXx1oRSYc6oKAKQVDn2bOcm8KfGoLOg 4MgMiF238vT48H4jm5KMCxLH3+nAk/wYWOof0RhYXydAAHow+pNVQxt9H/+DWaXpNoFX Jqb3V+Cz309TV1mT1k6HkLA1m7ze6nsOv635r0mxTOPHaFtos+uEWazRpRxIeXKstCiu DcStbEnwhn55+fKddBWmAI1gM3SXR+4CIvHwQO+Y/4TjroZILGdN8786sSVg6lXjqotf AAQw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=DQSU8rolozu7lSQWDAp1GcgcLkgu4v1NXOKHxL43rr4=; b=MgKs2UYyZVId16gsX9YMMMDjIICVXmZgDEuDZfMklAvMrClHTGIaqQXclOOQ/BGZWi EUkzeX2MamIG6LBZYTebTDbL8fdWnpYF8teVN1qTozNONXljSv8GzK52RZvqyEvTJwhn aTg55QwVYJln0bDb7k1OC1NC6JNO4ojz/tLmDOuiyII6rX3qe+77CwryjDqbAyyQl6pk EQrA1xvADZjku6EiTpZUdARHOQRoUh68rqDli1zEiQHh0gyNZkgV673n/pmd3wcw12i+ PHVuaLJExTrKddOLJQyrzj5sBgE5QS3IbIXYzuSa3HjfJSPrf/l6N6JF0I/STRJ6zNvf SJrg== X-Gm-Message-State: ALyK8tIEKOy7lrfVv+Q9ONP57oCKw5O5PG26rr1KZma6md1CYzFkCHWnzg2oCJCHmRNA6AXVVbNDOgr88fdWhw== X-Received: by 10.36.105.210 with SMTP id e201mr22736864itc.86.1466720022865; Thu, 23 Jun 2016 15:13:42 -0700 (PDT) MIME-Version: 1.0 Received: by 10.79.129.147 with HTTP; Thu, 23 Jun 2016 15:13:42 -0700 (PDT) In-Reply-To: References: From: Guozhang Wang Date: Thu, 23 Jun 2016 15:13:42 -0700 Message-ID: Subject: Re: KTable.filter usage, memory consumption and materialized view semantics To: "dev@kafka.apache.org" Content-Type: multipart/alternative; boundary=001a113f62d2f2afed0535f95d96 archived-at: Thu, 23 Jun 2016 22:13:51 -0000 --001a113f62d2f2afed0535f95d96 Content-Type: text/plain; charset=UTF-8 Hello Philippe, I think your question is really in two-folds: 1. What is the semantic difference between a KTable and a KStream, and more specifically how should we interpret (key, null) in KTable? You can find some explanations in this documentation: http://docs.confluent.io/3.0.0/streams/concepts.html#ktable-changelog-stream Note that KTable itself is still a stream behind the scene, although it may be materialized when necessary. And specifically to your question, (key, null) can be treated as a tombstone on the specified key, and when this KTable stream is materialized, it will result in a "delete" on materialized view. 2. As for the "filter" operator, yes it will generate a large amount of (key, null) records which indicates "delete" in the resulted KTable, and hence large traffic to the piped topic. But we are working on KIP-63 which unifies the caching mechanism in the `KTable.to` operator as well so that de-duping can be done in this operator and hence the outgoing traffic can be largely reduced: https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams Guozhang On Thu, Jun 23, 2016 at 5:50 AM, Philippe Derome wrote: > I made a modification of latest Confluent's example > UserRegionLambdaExample. See relevant code at end of email. > > Am I correct in understanding that KTable semantics should be similar to a > store-backed cache of a view as (per wikipedia on materialized views) or > similar to Oracle's materialized views and indexed views? More > specifically, I am looking at when a (key, null value) pair can make it > into KTable on generating table from a valid KStream with a false filter. > > Here's relevant code modified from example for which I observed that all > keys within userRegions are sent out to topic LargeRegions with a null > value. I would think that both regionCounts KTable and topic LargeRegions > should be empty so that the cached view agrees with the intended query (a > query with an intentional empty result set as the filter is intentionally > false as 1 >= 2). > > I am not sure I understand implications properly as I am new but it seems > possible that a highly selective filter from a large incoming stream would > result in high memory usage for regionCounts and hence the stream > application. > > KTable regionCounts = userRegions > // Count by region > // We do not need to specify any explicit serdes because the key > and value types do not change > .groupBy((userId, region) -> KeyValue.pair(region, region)) > .count("CountsByRegion") > // discard any regions FOR SAKE OF EXAMPLE > .filter((regionName, count) -> *1 >= 2*) > .mapValues(count -> count.toString()); > > > KStream regionCountsForConsole = regionCounts.toStream(); > > regionCountsForConsole.to(stringSerde, *stringSerde*, "LargeRegions"); > -- -- Guozhang --001a113f62d2f2afed0535f95d96--