From dev-return-95738-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Tue Jul 3 01:17:27 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 15BE8180626 for ; Tue, 3 Jul 2018 01:17:25 +0200 (CEST) Received: (qmail 90212 invoked by uid 500); 2 Jul 2018 23:17:24 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 90200 invoked by uid 99); 2 Jul 2018 23:17:24 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Jul 2018 23:17:24 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id A9793C2A38 for ; Mon, 2 Jul 2018 23:17:23 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.989 X-Spam-Level: * X-Spam-Status: No, score=1.989 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_PASS=-0.001, T_DKIMWL_WL_MED=-0.01] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=confluent-io.20150623.gappssmtp.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id VdkX8nbx5c72 for ; Mon, 2 Jul 2018 23:17:21 +0000 (UTC) Received: from mail-wm0-f46.google.com (mail-wm0-f46.google.com [74.125.82.46]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id A0F2E5F49F for ; Mon, 2 Jul 2018 23:17:20 +0000 (UTC) Received: by mail-wm0-f46.google.com with SMTP id p11-v6so338714wmc.4 for ; Mon, 02 Jul 2018 16:17:20 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=confluent-io.20150623.gappssmtp.com; s=20150623; h=mime-version:references:in-reply-to:from:date:message-id:subject:to; bh=2/8AKdeKN7x2Wcwu6WfChXFtlc/n8yjNruVeAXYjlyw=; b=Uhd0JQhjPDwEX1ODIX3YykudCGCnNsaYjpBxXncdLWXm46CxpxKtfDYFUMim2D4X7T CQTKN7PlMUPVxUgUqE6t1tVumvNHKLl1DRWDedluaPlM+8la4bST/6/9x+ZVgbydTzA5 9JkkuE7oC6ted504qQezGtcIOsCw4hQXUlM3e0Y48EW9ZNduMGO0Jje7xfCmv+SlHbpy 02vurK8Ri59iAKNKe7RYlDRCTSTZ1Vx3faQsTguX/pQJpVgeWgW8hyKQMAKVrzixrvJB kx0+LqGGEXUYuNs2RqNICGAgYplnraZEMSW2+zT9qdAYq/Cx49+U6no5hSUCwdtYQYIx 2Rlg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to; bh=2/8AKdeKN7x2Wcwu6WfChXFtlc/n8yjNruVeAXYjlyw=; b=YCz1mWzxmLZ5K2oxEZM+DEE2wMytDfFJA2Q/kNMVyAVyektQjcvAZTi+uXAulE7nBw KvJtSvvFG5QFkPwXVLkSDfcBIMt5MQFevgKm8dlp4qC8/5APKrIy7Yjf23ntJNeBp3CN MAhTQw0osAo7gD/YlNep6B6tnR5p3s5DCFPblMmDpYHFQhU/iSAzNimGpcItAZ6l0jlE e63L2Nywbi8TPUgM0sGjprBJDyxiKVKOPrhXGb9LOA1gvfdii976+BDw+5D279gDAC0B 5zKVgpjJ80Gid+DHwSwGpxd/a1ZCyTjaOvXakxRRz4u5PA0Yr+kkXOUFW3ksK9ubNMys 0BoQ== X-Gm-Message-State: APt69E31bo91DGSNI+T9qiO+F1VwCY0+OiCm3DJHlYqVTp3YJdnDSzbE whNrK6XP/R3ZrZdx7vGX3Oky/0MIBZn02J6mvGa0fA2B X-Google-Smtp-Source: AAOMgpffqj2YknddPF8YemEUqVxnt4pPqgy6Hc8AwJ1n3JA/uEX+VzICXpIPgjTbMTHqp4VZBZvSTcl7DPJrRvjvjKM= X-Received: by 2002:a1c:e189:: with SMTP id y131-v6mr9095247wmg.44.1530573439245; Mon, 02 Jul 2018 16:17:19 -0700 (PDT) MIME-Version: 1.0 References: <82bf868d-3a97-03d7-1f35-8e335a1a51b0@confluent.io> In-Reply-To: From: John Roesler Date: Mon, 2 Jul 2018 18:17:04 -0500 Message-ID: Subject: Re: [DISCUSS] KIP-326: Schedulable KTable as Graph source To: dev@kafka.apache.org Content-Type: multipart/alternative; boundary="000000000000263c2d05700c6607" --000000000000263c2d05700c6607 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Hi Fl=C3=A1vio, Sure thing. And apologies in advance if I missed the point. Below is some more-or-less realistic Java code to demonstrate how, given a high-volume (heavily partitioned) stream of purchases, we can "step down" the update rate with rate-limited intermediate aggregations. Please bear in mind that the suppression API itself is still under debate, so this is just for illustration purposes. Basically, the "suppress" operator creates a processor whose job is just to store the latest value for each key and not emit it until the configured time. So if key "X" gets updated 1000x/sec, we can use suppress to make sure it doesn't get emitted to the next processor more than once per second. Does this make sense? Thanks, -John public class KTableSuppressProcessorTest { private static class Purchase { final long customerId; final int value; private Purchase(final long customerId, final int value) { this.customerId =3D customerId; this.value =3D value; } } private static class PurchaseSerde implements Serde {...} public Topology buildTopology() { final StreamsBuilder builder =3D new StreamsBuilder(); final String purchases =3D "purchases"; final KTable input =3D builder.table( purchases, Consumed.with(Serdes.Long(), new PurchaseSerde()) ); // Fairly sloppy, but the idea is to "split" each customer id into one id per partition. // This way, we can first total their purchases inside each partition before aggregating them // across partitions final KTable purchasesWithPartitionedCustomers =3D input.transformValues( () -> new ValueTransformerWithKey() { private ProcessorContext context; @Override public void init(final ProcessorContext context) { this.context =3D context; } @Override public Purchase transform(final Long readOnlyKey, final Purchase purchase) { final int partition =3D context.partition(); return new Purchase( purchase.customerId * 1000 + partition, // Assuming we have < 1k partitions... purchase.value ); } }); final KGroupedTable purchaseValueByPartitionedCustomer =3D purchasesWithPartitionedCustomers.groupBy( (id, purchase) -> new KeyValue<>(purchase.customerId, purchase.value) ); final Suppression oncePerKeyPerSecond =3D Suppression.suppressIntermediateEvents( IntermediateSuppression .emitAfter(Duration.ofSeconds(1)) .bufferKeys(5000) .bufferFullStrategy(EMIT) ); // First level of aggregation. Each customer gets their purchases aggregated *just within each partition*. // The result of this aggregation is emitted at most once per second per customer per purchase-partition final KTable totalValueByPartitionedCustomer =3D purchaseValueByPartitionedCustomer .reduce((l, r) -> l + r, (l, r) -> l - r) .suppress(oncePerKeyPerSecond); // This is where we reverse the partitioning of each customer and then aggregate // each customer's purchases across partitions // The result of this aggregation is emitted at most once per second per customer final KTable aggregatedTotalValueByPartitionedCustomer =3D totalValueByPartitionedCustomer .groupBy((key, value) -> new KeyValue<>(key / 1000, value)) .reduce((l, r) -> l + r, (l, r) -> l - r) .suppress(oncePerKeyPerSecond); // Sending all the intermediate totals to a single key to get the final aggregation // The result of this aggregation is emitted at most once per secon= d final KTable total =3D aggregatedTotalValueByPartitionedCustomer .groupBy((key, value) -> new KeyValue<>("ALL", value)) .reduce((l, r) -> l + r, (l, r) -> l - r) .suppress(Suppression.suppressIntermediateEvents( IntermediateSuppression.emitAfter(Duration.ofSeconds(1)) )); // This topic will contain just one key ("ALL"), and the value will be // the ever-updating all-time purchase value // Note that it'll be updated once per second. total.toStream().to("total-purchases-value"); return builder.build(); } } On Mon, Jul 2, 2018 at 3:38 PM flaviostutz@gmail.com wrote: > Thanks for clarifying the real usage of KIP-328. Now I understood a bit > better. > I didn't see how that feature would be used to minimize the number of > publications to the single partitioned output topic. When it is falls int= o > supression, the graph stops going down? Could you explain better? If that > is possible I think it would be great. > > Thanks for the intervention! > > -Fl=C3=A1vio Stutz > > > > > On 2018/07/02 20:03:57, John Roesler wrote: > > Hi Fl=C3=A1vio, > > > > Thanks for the KIP. I'll apologize that I'm arriving late to the > > discussion. I've tried to catch up, but I might have missed some nuance= s. > > > > Regarding KIP-328, the idea is to add the ability to suppress > intermediate > > results from all KTables, not just windowed ones. I think this could > > support your use case in combination with the strategy that Guozhang > > proposed of having one or more pre-aggregation steps that ultimately pu= sh > > into a single-partition topic for final aggregation. Suppressing > > intermediate results would solve the problem you noted that today > > pre-aggregating doesn't do much to staunch the flow up updates. > > > > I'm not sure if this would be good enough for you overall; I just wante= d > to > > clarify the role of KIP-328. > > In particular, the solution you mentioned is to have the downstream > KTables > > actually query the upstream ones to compute their results. I'm not sure > > whether it's more efficient to do these queries on the schedule, or to > have > > the upstream tables emit their results, on the same schedule. > > > > What do you think? > > > > Thanks, > > -John > > > > On Sun, Jul 1, 2018 at 10:03 PM flaviostutz@gmail.com < > flaviostutz@gmail.com> > > wrote: > > > > > For what I understood, that KIP is related to how KStreams will handl= e > > > KTable updates in Windowed scenarios to optimize resource usage. > > > I couldn't see any specific relation to this KIP. Had you? > > > > > > -Fl=C3=A1vio Stutz > > > > > > > > > On 2018/06/29 18:14:46, "Matthias J. Sax" > wrote: > > > > Flavio, > > > > > > > > thanks for cleaning up the KIP number collision. > > > > > > > > With regard to KIP-328 > > > > ( > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+s= uppress+updates+for+KTables > > > ) > > > > I am wondering how both relate to each other? > > > > > > > > Any thoughts? > > > > > > > > > > > > -Matthias > > > > > > > > On 6/29/18 10:23 AM, flaviostutz@gmail.com wrote: > > > > > Just copying a follow up from another thread to here (sorry about > the > > > mess): > > > > > > > > > > From: Guozhang Wang > > > > > Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph sourc= e > > > > > Date: 2018/06/25 22:24:17 > > > > > List: dev@kafka.apache.org > > > > > > > > > > Fl=C3=A1vio, thanks for creating this KIP. > > > > > > > > > > I think this "single-aggregation" use case is common enough that = we > > > should > > > > > consider how to efficiently supports it: for example, for KSQL > that's > > > built > > > > > on top of Streams, we've seen lots of query statements whose > return is > > > > > expected a single row indicating the "total aggregate" etc. See > > > > > https://github.com/confluentinc/ksql/issues/430 for details. > > > > > > > > > > I've not read through > https://issues.apache.org/jira/browse/KAFKA-6953, > > > but > > > > > I'm wondering if we have discussed the option of supporting it in= a > > > > > "pre-aggregate" manner: that is we do partial aggregates on > parallel > > > tasks, > > > > > and then sends the partial aggregated value via a single topic > > > partition > > > > > for the final aggregate, to reduce the traffic on that single > > > partition and > > > > > hence the final aggregate workload. > > > > > Of course, for non-commutative aggregates we'd probably need to > provide > > > > > another API in addition to aggregate, like the `merge` function f= or > > > > > session-based aggregates, to let users customize the operations o= f > > > merging > > > > > two partial aggregates into a single partial aggregate. What's it= s > > > pros and > > > > > cons compared with the current proposal? > > > > > > > > > > > > > > > Guozhang > > > > > On 2018/06/26 18:22:27, Fl=C3=A1vio Stutz > wrote: > > > > >> Hey, guys, I've just created a new KIP about creating a new DSL > graph > > > > >> source for realtime partitioned consolidations. > > > > >> > > > > >> We have faced the following scenario/problem in a lot of > situations > > > with > > > > >> KStreams: > > > > >> - Huge incoming data being processed by numerous application > > > instances > > > > >> - Need to aggregate different fields whose records span all > topic > > > > >> partitions (something like =E2=80=9Ctotal amount spent by people= aged > 30 > > > yrs=E2=80=9D > > > > >> when processing a topic partitioned by userid). > > > > >> > > > > >> The challenge here is to manage this kind of situation without a= ny > > > > >> bottlenecks. We don't need the =E2=80=9Cglobal aggregation=E2=80= =9D to be > processed > > > at each > > > > >> incoming message. On a scenario of 500 instances, each handling = 1k > > > > >> messages/s, any single point of aggregation (single partitioned > > > topics, > > > > >> global tables or external databases) would create a bottleneck o= f > 500k > > > > >> messages/s for single threaded/CPU elements. > > > > >> > > > > >> For this scenario, it is possible to store the partial > aggregations on > > > > >> local stores and, from time to time, query those states and > aggregate > > > them > > > > >> as a single value, avoiding bottlenecks. This is a way to create= a > > > "timed > > > > >> aggregation barrier=E2=80=9D. > > > > >> > > > > >> If we leverage this kind of built-in feature we could greatly > enhance > > > the > > > > >> ability of KStreams to better handle the CAP Theorem > characteristics, > > > so > > > > >> that one could choose to have Consistency over Availability when > > > needed. > > > > >> > > > > >> We started this discussion with Matthias J. Sax here: > > > > >> https://issues.apache.org/jira/browse/KAFKA-6953 > > > > >> > > > > >> If you want to see more, go to KIP-326 at: > > > > >> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-326%3A+Schedulable+= KTable+as+Graph+source > > > > >> > > > > >> -Fl=C3=A1vio Stutz > > > > >> > > > > > > > > > > > > > > --000000000000263c2d05700c6607--