From dev-return-94094-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Sat May 12 02:02:23 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id D6EA0180647 for ; Sat, 12 May 2018 02:02:21 +0200 (CEST) Received: (qmail 99764 invoked by uid 500); 12 May 2018 00:02:20 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 99748 invoked by uid 99); 12 May 2018 00:02:19 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 12 May 2018 00:02:19 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 3633A1A1FB1 for ; Sat, 12 May 2018 00:02:19 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 4.193 X-Spam-Level: **** X-Spam-Status: No, score=4.193 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001, YOU_INHERIT=2.294] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id hPg9jsp2KYtr for ; Sat, 12 May 2018 00:02:09 +0000 (UTC) Received: from mail-oi0-f48.google.com (mail-oi0-f48.google.com [209.85.218.48]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id BE4895FAE6 for ; Sat, 12 May 2018 00:02:08 +0000 (UTC) Received: by mail-oi0-f48.google.com with SMTP id b130-v6so6140013oif.12 for ; Fri, 11 May 2018 17:02:08 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=Dnr/POyaaM8VhHfSt49rD8Q1l/buALIylNU291WnLfU=; b=kR8vyt1lM69TeH0RALo8ZBJKrqBfID6+ms4yt0lWasp1p4vTauVwp9tZ87SgBzH234 gezm7BqERvDqx31FdCk4R9S+12z+beRNkEcSFSttDiT03QvYRL0xP3CYLZFRqgUqaV4U xmBWOS6y2fe1auC67y7L0CQx0dUpoApGBiWtstzDpjwEEPbMfKZ9NEzJy+TWxkUdNczi xbm2JXK46CfPRJMKmtXg0OiR19wxz6YBpEgH4I5SFu6/YxFMqStwneDsurjoQtWOe7uD bL1zxiWGpVrudJZv0B2UXom9JxNGArdpF0cPQvA0Em283+Rh54NhNZMm7qz/nFfnOZSn sOgw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=Dnr/POyaaM8VhHfSt49rD8Q1l/buALIylNU291WnLfU=; b=poscZWmgUBFeEC530PlDLp2DOWE9phVpZ5QrDV9mMOhPRC9BB3YrZHvOftF53NMqbM ZBbg01Tirv4OY848NH5dxaCbi+6lTKnpr0n/DhysV0Eqavc1hVzwXph1SsO3u9Zg5Wgf Ee9W1vYSyDJy5/EqhxNmq4FTkeXlp6Yr6jBbBWJXs9kyTfvINc+yeGwCyXGmCZ/1ScXA SWXU8LJqVURdMQdlKAeQFe11MAEnYxeNVqYv+ccem+kKtcgjZlUudbr9ZAs1TIKgZWvn Qvrfvmq7ppOXMX/SB2uzxXq7vvRernQxFO2bD1gR/YKCWUWmtIGitzlkacMUsV09ou5L SZIg== X-Gm-Message-State: ALKqPwe53/WKkXyIm/thVjHO8TeBbd6clvEp3jbWRIRyk3IH0Xo7GfmI ES3GabwDcPrKFsfzd6cxlmaNnnZLGZdRxGphy8g= X-Google-Smtp-Source: AB8JxZqa48jP5k+1pEkTZfZr3Obv5vqCHO/Q/R7DvYZss0lyyJIEKkyYotujbXF+tWv8BGUeMAe0JdlPoKgsII/Z2pQ= X-Received: by 2002:aca:4717:: with SMTP id u23-v6mr656515oia.229.1526083322087; Fri, 11 May 2018 17:02:02 -0700 (PDT) MIME-Version: 1.0 Received: by 10.74.209.144 with HTTP; Fri, 11 May 2018 17:02:01 -0700 (PDT) In-Reply-To: References: <42419660-e789-9765-12c3-364fd23a9ec1@confluent.io> <9661ba87-1ef6-0157-d2d7-50df1b15a4c2@confluent.io> From: Guozhang Wang Date: Fri, 11 May 2018 17:02:01 -0700 Message-ID: Subject: Re: KIP-244: Add Record Header support to Kafka Streams To: dev@kafka.apache.org Content-Type: multipart/alternative; boundary="0000000000004f8ef4056bf6f608" --0000000000004f8ef4056bf6f608 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Yeah I'm only talking about the DSL part (i.e. how stateful / stateless operators default inheritance protocol would be promised) to be managed with KIP-159. For allowing users to override the default behavior in PAPI, that would be in a different KIP. Guozhang On Fri, May 11, 2018 at 10:41 AM, Matthias J. Sax wrote: > I am actually not sure about this. Because it's about the semantics at > PAPI level, but KIP-159 targets the DSL, it might actually be better to > have a separate KIP? > > -Matthias > > On 5/11/18 9:26 AM, Guozhang Wang wrote: > > That's a good question. I think we can manage this in KIP-159. I will g= o > > ahead and try to augment that KIP together with the original author > Jeyhun. > > > > > > Guozhang > > > > On Fri, May 11, 2018 at 12:45 AM, Jorge Esteban Quilcate Otoya < > > quilcate.jorge@gmail.com> wrote: > > > >> Thanks Guozhang and Matthias! I do also agree with this way of handlin= g > >> headers inheritance. I will add them to the KIP doc. > >> > >>> We can discuss about extending the current protocol and how to enable > >> users > >>> override those rule, and how to expose them in the DSL layer in a > future > >>> KIP. > >> > >> About this, should this be managed on KIP-159 or a new one? > >> > >> El jue., 10 may. 2018 a las 17:46, Matthias J. Sax (< > matthias@confluent.io > >>> ) > >> escribi=C3=B3: > >> > >>> Thanks Guozhang! Sounds good to me! > >>> > >>> -Matthias > >>> > >>> On 5/10/18 7:55 AM, Guozhang Wang wrote: > >>>> Thanks for your thoughts Matthias. I think if we do want to bring > >> KIP-244 > >>>> into 2.0 then we need to keep its scope small and well defined. For > >> that > >>>> I'm proposing: > >>>> > >>>> 1. Make the inheritance implementation of headers consistent with wh= at > >> we > >>>> had with other record context fields. I.e. pass through the record > >>> context > >>>> in `context.forward()`. Note that within a processor node, users can > >>>> already manipulate the Headers with the given APIs, so at the time o= f > >>>> forwarding, the library can just copy what-ever is left / updated to > >> the > >>>> next processor node. > >>>> > >>>> 2. In the sink node, where a record is being sent to the Kafka topic= , > >> we > >>>> should consider the following: > >>>> > >>>> a. For sink topics, we will set the headers into the producer record= . > >>>> b. For repartition topics, we will the headers into the producer > >> record. > >>>> c. For changelog topics, we will drop the headers in the produce > record > >>>> since they will not be used in restoration and not stored in the sta= te > >>>> store either. > >>>> > >>>> > >>>> We can discuss about extending the current protocol and how to enabl= e > >>> users > >>>> override those rule, and how to expose them in the DSL layer in a > >> future > >>>> KIP. > >>>> > >>>> > >>>> > >>>> Guozhang > >>>> > >>>> > >>>> On Mon, May 7, 2018 at 5:49 PM, Matthias J. Sax < > matthias@confluent.io > >>> > >>>> wrote: > >>>> > >>>>> Guozhang, > >>>>> > >>>>> if you advocate to forward headers by default, it might be a better > >>>>> default strategy do forward the headers for all operators (similar = to > >>>>> topic/partition/offset metadata). It's usually harder for users to > >>>>> reason about different cases and thus I would prefer to have > >> consistent > >>>>> behavior, ie, only one default strategy instead of introducing > >> different > >>>>> cases. > >>>>> > >>>>> Btw: My argument about dropping headers by default only implies, th= at > >>>>> users need to copy the headers explicitly to the output records in > >> there > >>>>> code of they want to inspect them later -- it does not imply that > >>>>> headers cannot be forwarded downstream. (Not sure if this was clear= ). > >>>>> > >>>>> I am also ok with copying be default thought (for me, it's a 51/49 > >>>>> preference for dropping by default only). > >>>>> > >>>>> > >>>>> -Matthias > >>>>> > >>>>> On 5/7/18 4:52 PM, Guozhang Wang wrote: > >>>>>> Hi Matthias, > >>>>>> > >>>>>> My concern of setting `null` in all cases is that it would make > >> headers > >>>>> not > >>>>>> very useful in KIP-244 then, because headers will only be availabl= e > >> at > >>>>> the > >>>>>> source stream / table, but not in any of the following instances. = In > >>>>>> practice users may be more likely to look into the headers later i= n > >> the > >>>>>> pipeline. Personally I'd suggest we pass the headers for all > >> stateless > >>>>>> operators in DSL and everywhere in PAPI's context.forward(). For > >>>>>> repartition topics and sink topics, we also set them in the produc= ed > >>>>>> records accordingly; for changelog topics, we do not set them sinc= e > >>> they > >>>>>> are not going to be used anywhere in the store. > >>>>>> > >>>>>> > >>>>>> Guozhang > >>>>>> > >>>>>> > >>>>>> On Sun, May 6, 2018 at 9:03 PM, Matthias J. Sax < > >> matthias@confluent.io > >>>> > >>>>>> wrote: > >>>>>> > >>>>>>> I agree, that we should not block this KIP if possible. > >> Nevertheless, > >>> we > >>>>>>> should try to get a reasonable default strategy for inheriting th= e > >>>>>>> headers so we don't need to change it later on. > >>>>>>> > >>>>>>> Let's see what other think. I still tend slightly to set to `null= ` > >> by > >>>>>>> default for all cases. If the default strategy is different for > >>>>>>> different operators as you suggest, it might be confusion to user= s. > >>>>>>> IMHO, the default behavior should be as simple as possible. > >>>>>>> > >>>>>>> > >>>>>>> -Matthias > >>>>>>> > >>>>>>> > >>>>>>> On 5/6/18 8:53 PM, Guozhang Wang wrote: > >>>>>>>> Matthias, thanks for sharing your opinions in the inheritance > >>> protocol > >>>>> of > >>>>>>>> the record context. I'm thinking maybe we should make this > >> discussion > >>>>> as > >>>>>>> a > >>>>>>>> separate KIP by itself? If yes, then KIP-244's scope would be > >>> smaller, > >>>>>>> and > >>>>>>>> within KIP-244 we can have a simple inheritance rule that settin= g > >> it > >>> to > >>>>>>>> null when 1) going through stateful operators and 2) sending to > any > >>>>>>> topics. > >>>>>>>> > >>>>>>>> > >>>>>>>> Guozhang > >>>>>>>> > >>>>>>>> On Sun, May 6, 2018 at 10:24 AM, Matthias J. Sax < > >>>>> matthias@confluent.io> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Making the inheritance protocol a public contract seems > reasonable > >>> to > >>>>>>> me. > >>>>>>>>> > >>>>>>>>> In the current implementation, all output records inherits the > >>> offset, > >>>>>>>>> timestamp, topic, and partition metadata from the input record. > We > >>>>>>>>> already added an API to change the timestamp explicitly for the > >>> output > >>>>>>>>> record thought. > >>>>>>>>> > >>>>>>>>> I think it make sense to keep the inheritance of offset, topic, > >> and > >>>>>>>>> partition. For headers, it's worth to discuss. I see arguments > for > >>> two > >>>>>>>>> strategies: (1) inherit by default, (2) set `null` by default. > >>>>>>>>> Independent of the default behavior, we should add an API to se= t > >>>>> headers > >>>>>>>>> for output records explicitly though (similar to the "set > >> timestamp > >>>>>>> API"). > >>>>>>>>> > >>>>>>>>> From my point of view, timestamp/headers are a different > >>>>>>>>> "class/category" of data/metadata than topic/partition/offset. > For > >>> the > >>>>>>>>> first category, it makes sense to manipulate them and it's more > >> than > >>>>>>>>> "plain metadata"; especially the timestamp. For the second > >> category > >>> it > >>>>>>>>> does not make sense to manipulate it, and to me > >>> topic/partition/offset > >>>>>>>>> is pure metadata only---strictly speaking, it's even questionab= le > >> if > >>>>>>>>> output records should have any value for topic/partition/offset > in > >>> the > >>>>>>>>> first place, or if they should be `null`, because those > attributes > >>> do > >>>>>>>>> only make sense for source records that are consumed from a top= ic > >>>>>>>>> directly only. On the other hand, if we make this difference > >>> explicit, > >>>>>>>>> it might be useful information for the use to track the current > >>>>>>>>> topic/partition/offset of the original source record. > >>>>>>>>> > >>>>>>>>> Furthermore, to me, timestamps and headers are somewhat > different, > >>>>> too. > >>>>>>>>> For stream processing it's required that every record has a > >>> timestamp; > >>>>>>>>> thus, it make sense to inherit the input record timestamp by > >> default > >>>>> (a > >>>>>>>>> timestamp is not really metadata but actually equally important > to > >>> key > >>>>>>>>> and value from my point of view). Header however are optional, > and > >>>>> thus > >>>>>>>>> inheriting them is not really required. It might be convenient > >>> though: > >>>>>>>>> for example, imagine a simple "filter-only" application -- it > >> would > >>> be > >>>>>>>>> cumbersome for users to explicitly copy the headers from the > input > >>>>>>>>> records to the output records -- it seems to be unnecessary > >>>>> boilerplate > >>>>>>>>> code. On the other hand, for any other more complex use case, > it's > >>>>>>>>> questionable to inherit headers---note, that headers would be > >>> written > >>>>> to > >>>>>>>>> the output topics increasing the size of the messages. Overall,= I > >> am > >>>>> not > >>>>>>>>> sure which default strategy might be the better one for headers= . > >> Is > >>>>>>>>> there a convincing argument for either one of them? I slightly > >> tend > >>> to > >>>>>>>>> think that using `null` as default might be better. > >>>>>>>>> > >>>>>>>>> Last, we could also make the default behavior configurable. > >>> Something > >>>>>>>>> like `inherit.record.headers=3Dtrue/false` with default value > >> "false". > >>>>>>>>> This would allow people to opt-in for auto-header-inheritance. > >> Just > >>> an > >>>>>>>>> idea I wanted to add to the discussion---not sure if it's a goo= d > >>> one. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -Matthias > >>>>>>>>> > >>>>>>>>> On 5/4/18 3:13 PM, Guozhang Wang wrote: > >>>>>>>>>> Hello Jorge, > >>>>>>>>>> > >>>>>>>>>>> Agree. Probably point 3 handles this. `Headers` been part of > >>>>>>>>> `RecordContext` > >>>>>>>>>> would be handled the same way as other attributes. > >>>>>>>>>> > >>>>>>>>>> Today we do not have a clear inheritance protocol for other > >> fields > >>> of > >>>>>>>>>> RecordContext yet: although internally we do have some criteri= on > >> on > >>>>>>>>>> topic/partition/offset and timestamp, they are not explicitly > >>> exposed > >>>>>>> to > >>>>>>>>>> users. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> I think we still need to have a defined protocol for headers > >>> itself, > >>>>>>> but > >>>>>>>>> I > >>>>>>>>>> agree that it better to be scoped out side of this KIP, since > >> this > >>>>>>>>>> inheritance protocol itself for all the fields of RecordContex= t > >>> would > >>>>>>>>>> better be a separate KIP. We can document this clearly in the > >> wiki > >>>>>>> page. > >>>>>>>>>> > >>>>>>>>>> Guozhang > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Fri, May 4, 2018 at 5:26 AM, Florian Garcia < > >>>>>>>>>> garcia.florian.perso@gmail.com> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi, > >>>>>>>>>>> > >>>>>>>>>>> For me this is a great first step to have Headers in streamin= g. > >>>>>>>>>>> My current use case is about distributed tracing (Zipkin) and > >> with > >>>>> the > >>>>>>>>>>> headers in the processorContext() I'll be able to manage that > >> for > >>>>> the > >>>>>>>>> most > >>>>>>>>>>> cases. > >>>>>>>>>>> The KIP-159 should follow after this but this is where all th= e > >>> major > >>>>>>>>>>> questions will arise for stateful operations (as Guozhang > said). > >>>>>>>>>>> > >>>>>>>>>>> Thanks for the work on this Jorge. > >>>>>>>>>>> > >>>>>>>>>>> Le ven. 4 mai 2018 =C3=A0 01:04, Jorge Esteban Quilcate Otoya= < > >>>>>>>>>>> quilcate.jorge@gmail.com> a =C3=A9crit : > >>>>>>>>>>> > >>>>>>>>>>>> Thanks Guozhang and John for your feedback. > >>>>>>>>>>>> > >>>>>>>>>>>>> 1. We need to have a clear inheritance protocol of headers = in > >>> our > >>>>>>>>>>>> topology: > >>>>>>>>>>>>> 1.a. In PAPI's context.forward() call, it should be > >>>>>>> straight-forward. > >>>>>>>>>>>>> 1.b. In DSL stateless operators, it should be > >> straight-forward. > >>>>>>>>>>>>> 1.c. What about in stateful operators like aggregates and > >> joins? > >>>>>>>>>>>> > >>>>>>>>>>>> Agree. Probably point 3 handles this. `Headers` been part of > >>>>>>>>>>>> `RecordContext` would be handled the same way as other > >>> attributes. > >>>>>>>>>>>> > >>>>>>>>>>>>> 3. In future work "Adding DSL Processors to use Headers to > >>>>>>>>>>>> filter/map/branch", > >>>>>>>>>>>> it may well be covered in KIP-159; worth taking a look at th= at > >>> KIP. > >>>>>>>>>>>> > >>>>>>>>>>>> Yes, I will point to it. > >>>>>>>>>>>> > >>>>>>>>>>>>> 2. In terms of internal implementations, should the state > >> store > >>>>>>>>>>>> cache include the headers then in order to be sent > downstreams? > >>>>>>>>>>>> > >>>>>>>>>>>> Good question. As `LRUCacheEntry` extends `RecordContext`, I > >>> thinks > >>>>>>>>> this > >>>>>>>>>>> is > >>>>>>>>>>>> already supported. I will detail this on the KIP. > >>>>>>>>>>>> > >>>>>>>>>>>>> 4. MINOR: "void process(K key, V value, Headers headers)", > >> this > >>>>>>> should > >>>>>>>>>>> be > >>>>>>>>>>>> removed? > >>>>>>>>>>>> > >>>>>>>>>>>> Fixed, thanks. > >>>>>>>>>>>> > >>>>>>>>>>>>> 5. MINOR: it seems to be the case that in this KIP, our sco= pe > >> is > >>>>>>> only > >>>>>>>>>>>> for exposing > >>>>>>>>>>>> the headers for reading, and not allowing users to add / > modify > >>>>>>>>> headers, > >>>>>>>>>>>> right? If yes, we'd better state it clearly at the "Proposed > >>>>> Changes" > >>>>>>>>>>>> section. > >>>>>>>>>>>> > >>>>>>>>>>>> As headers is exposed in the `ProcessContext`, and headers > will > >>> be > >>>>>>> send > >>>>>>>>>>>> downstream, it can be mutated (add/remove headers). > >>>>>>>>>>>> > >>>>>>>>>>>> > Also, despite the decreased scope in this KIP, I think it > >>> might > >>>>> be > >>>>>>>>>>>> valuable to define what will happen to headers once this > change > >>> is > >>>>>>>>>>>> implemented. For example, I think a minimal groundwork-level > >>> change > >>>>>>>>> might > >>>>>>>>>>>> be to make the API changes, while promising to drop all > headers > >>>>> from > >>>>>>>>>>> input > >>>>>>>>>>>> records. > >>>>>>>>>>>> > >>>>>>>>>>>> I will suggest to pass headers to downstream nodes, and don'= t > >>> drop > >>>>>>>>> yhrm. > >>>>>>>>>>>> Clients will have to drop `Headers` if they have used them. > >>>>>>>>>>>> Or it could be something like a boolean config property that > >>> manage > >>>>>>>>> this. > >>>>>>>>>>>> I would like to hear feedback here. > >>>>>>>>>>>> > >>>>>>>>>>>>> A maximal groundwork change would be to forward the headers > >>>>> through > >>>>>>>>> all > >>>>>>>>>>>> operators > >>>>>>>>>>>> in > >>>>>>>>>>>> > >>>>>>>>>>>> Streams. But I think there are some unresolved questions abo= ut > >>>>>>>>>>> forwarding, > >>>>>>>>>>>> like "what happens to the headers in a join?" > >>>>>>>>>>>> Probably this would be solve once KIP-159 is implemented and > >>>>>>> supporting > >>>>>>>>>>>> Headers. > >>>>>>>>>>>> > >>>>>>>>>>>>> There's of course some middle ground, but instinctively, I > >> think > >>>>> I'd > >>>>>>>>>>>> prefer to have a clear definition that headers are currently > >>> *not* > >>>>>>>>>>>> forwarded, rather than having a complex list of operators th= at > >> do > >>>>> or > >>>>>>>>>>> don't > >>>>>>>>>>>> forward them. Plus, I think it might be tricky to define thi= s > >>>>>>> behavior > >>>>>>>>>>>> while not allowing the scope to return to that of your > original > >>>>>>>>> proposal! > >>>>>>>>>>>> > >>>>>>>>>>>> Agree. But `Headers` were forwarded *explicitly* in the > >> original > >>>>>>>>>>> proposal. > >>>>>>>>>>>> The current one pass it as part of `RecordContext`, so if it= 's > >>>>>>> forward > >>>>>>>>> it > >>>>>>>>>>>> or not is as the same as `RecordContext`. > >>>>>>>>>>>> On top of this implementation, we can design how > >> filter/map/join > >>>>> will > >>>>>>>>> be > >>>>>>>>>>>> handled. Probably following KIP-159 approach. > >>>>>>>>>>>> > >>>>>>>>>>>> Cheers, > >>>>>>>>>>>> Jorge. > >>>>>>>>>>>> > >>>>>>>>>>>> El mi=C3=A9., 2 may. 2018 a las 22:56, Guozhang Wang (< > >>>>> wangguoz@gmail.com > >>>>>>>> ) > >>>>>>>>>>>> escribi=C3=B3: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hi Jorge, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks for the written KIP! Made a pass over it and left so= me > >>>>>>> comments > >>>>>>>>>>>>> (some of them overlapped with John's): > >>>>>>>>>>>>> > >>>>>>>>>>>>> 1. We need to have a clear inheritance protocol of headers = in > >>> our > >>>>>>>>>>>> topology: > >>>>>>>>>>>>> > >>>>>>>>>>>>> 1.a. In PAPI's context.forward() call, it should be > >>>>>>> straight-forward. > >>>>>>>>>>>>> 1.b. In DSL stateless operators, it should be > >> straight-forward. > >>>>>>>>>>>>> 1.c. What about in stateful operators like aggregates and > >> joins? > >>>>>>>>>>>>> > >>>>>>>>>>>>> 2. In terms of internal implementations, should the state > >> store > >>>>>>> cache > >>>>>>>>>>>>> include the headers then in order to be sent downstreams? > >>>>>>>>>>>>> > >>>>>>>>>>>>> 3. In future work "Adding DSL Processors to use Headers to > >>>>>>>>>>>>> filter/map/branch", it may well be covered in KIP-159; wort= h > >>>>> taking > >>>>>>> a > >>>>>>>>>>>> look > >>>>>>>>>>>>> at that KIP. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 4. MINOR: "void process(K key, V value, Headers headers)", > >> this > >>>>>>> should > >>>>>>>>>>> be > >>>>>>>>>>>>> removed? > >>>>>>>>>>>>> > >>>>>>>>>>>>> 5. MINOR: it seems to be the case that in this KIP, our sco= pe > >> is > >>>>>>> only > >>>>>>>>>>> for > >>>>>>>>>>>>> exposing the headers for reading, and not allowing users to > >> add > >>> / > >>>>>>>>>>> modify > >>>>>>>>>>>>> headers, right? If yes, we'd better state it clearly at the > >>>>>>> "Proposed > >>>>>>>>>>>>> Changes" section. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Wed, May 2, 2018 at 8:42 AM, John Roesler < > >> john@confluent.io > >>>> > >>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi Jorge, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks for the design work. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I agree that de-scoping the work to just the Processor API > >> will > >>>>>>> help > >>>>>>>>>>>>>> contain the design and implementation complexity. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> In the KIP, it mentions that the headers would be availabl= e > >> in > >>>>> the > >>>>>>>>>>>>>> ProcessorContext, (like "context.headers()"). It also says > >> that > >>>>>>>>>>>>>> implementers would need to implement the method "void > >> process(K > >>>>>>> key, > >>>>>>>>>>> V > >>>>>>>>>>>>>> value, Headers headers);". I think maybe you meant to remo= ve > >>> the > >>>>>>>>>>>> proposal > >>>>>>>>>>>>>> to modify "process", since it wouldn't be necessary in > >>>>> conjunction > >>>>>>>>>>> with > >>>>>>>>>>>>> the > >>>>>>>>>>>>>> ProcessorContext change, and it's not represented in your > PR. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Also, despite the decreased scope in this KIP, I think it > >> might > >>>>> be > >>>>>>>>>>>>> valuable > >>>>>>>>>>>>>> to define what will happen to headers once this change is > >>>>>>>>>>> implemented. > >>>>>>>>>>>>> For > >>>>>>>>>>>>>> example, I think a minimal groundwork-level change might b= e > >> to > >>>>> make > >>>>>>>>>>> the > >>>>>>>>>>>>> API > >>>>>>>>>>>>>> changes, while promising to drop all headers from input > >>> records. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> A maximal groundwork change would be to forward the header= s > >>>>> through > >>>>>>>>>>> all > >>>>>>>>>>>>>> operators in Streams. But I think there are some unresolve= d > >>>>>>> questions > >>>>>>>>>>>>> about > >>>>>>>>>>>>>> forwarding, like "what happens to the headers in a join?" > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> There's of course some middle ground, but instinctively, I > >>> think > >>>>>>> I'd > >>>>>>>>>>>>> prefer > >>>>>>>>>>>>>> to have a clear definition that headers are currently *not= * > >>>>>>>>>>> forwarded, > >>>>>>>>>>>>>> rather than having a complex list of operators that do or > >> don't > >>>>>>>>>>> forward > >>>>>>>>>>>>>> them. Plus, I think it might be tricky to define this > >> behavior > >>>>>>> while > >>>>>>>>>>>> not > >>>>>>>>>>>>>> allowing the scope to return to that of your original > >> proposal! > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks again for the KIP, > >>>>>>>>>>>>>> -John > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Wed, May 2, 2018 at 8:05 AM, Jorge Esteban Quilcate Oto= ya > >> < > >>>>>>>>>>>>>> quilcate.jorge@gmail.com> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hi Matthias, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I've created a new JIRA to track this, updated the KIP an= d > >>>>> create > >>>>>>> a > >>>>>>>>>>>> PR. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Looking forward to your feedback, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Jorge. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> El mar., 13 feb. 2018 a las 22:43, Matthias J. Sax (< > >>>>>>>>>>>>>> matthias@confluent.io > >>>>>>>>>>>>>>>> ) > >>>>>>>>>>>>>>> escribi=C3=B3: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi Jorge, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I would like to unblock this KIP to make some progress. > The > >>>>>>>>>>> tricky > >>>>>>>>>>>>>>>> question of this work, seems to be how to expose headers > at > >>> DSL > >>>>>>>>>>>>> level. > >>>>>>>>>>>>>>>> This related to KIP-149 and KIP-159. However, for > Processor > >>>>> API, > >>>>>>>>>>> it > >>>>>>>>>>>>>>>> seems to be rather straight forward to add headers to th= e > >>> API. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thus, I would suggest to de-scope this KIP and add heade= r > >>>>> support > >>>>>>>>>>>> for > >>>>>>>>>>>>>>>> Processor API only as a first step. If this is done, we > can > >>> see > >>>>>>>>>>> in > >>>>>>>>>>>> a > >>>>>>>>>>>>>>>> second step, how to add headers at DSL level. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> WDYT about this proposal? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> If you agree, please update the JIRA and KIP accordingly= . > >>> Note, > >>>>>>>>>>>> that > >>>>>>>>>>>>> we > >>>>>>>>>>>>>>>> have two JIRA that are duplicates atm. We can scope them > >>>>>>>>>>>> accordingly: > >>>>>>>>>>>>>>>> one for PAPI only, and second as a dependent JIRA for DS= L. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On 12/30/17 3:11 PM, Jorge Esteban Quilcate Otoya wrote: > >>>>>>>>>>>>>>>>> Thanks for your feedback! > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 1. I was adding headers to KeyValue to support groupBy, > >> but > >>> I > >>>>>>>>>>>> think > >>>>>>>>>>>>>> it > >>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>> not necessary. It should be enough with mapping headers > to > >>>>>>>>>>>>> key/value > >>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>> then group using current KeyValue structure. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 2. Yes. IMO key/value stores, like RocksDB, rely on KV = as > >>>>>>>>>>>>> structure, > >>>>>>>>>>>>>>>> hence > >>>>>>>>>>>>>>>>> considering headers as part of stateful operations will > >> not > >>>>> fit > >>>>>>>>>>>> in > >>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>> approach and increase complexity (I cannot think in a > >>> use-case > >>>>>>>>>>>> that > >>>>>>>>>>>>>>> need > >>>>>>>>>>>>>>>>> this). > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 3. and 4. Changes on 1. will solve this issue. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Probably I rush a bit proposing this change, I was not > >> aware > >>>>> of > >>>>>>>>>>>>>> KIP-159 > >>>>>>>>>>>>>>>> or > >>>>>>>>>>>>>>>>> KAFKA-5632. > >>>>>>>>>>>>>>>>> If KIP-159 is adopted and we reduce this KIP to add > >> Headers > >>> to > >>>>>>>>>>>>>>>>> RecordContext will be enough, but I'm not sure about th= e > >>> scope > >>>>>>>>>>> of > >>>>>>>>>>>>>>>> KIP-159. > >>>>>>>>>>>>>>>>> If it includes stateful operations will be difficult to > >>>>>>>>>>>> implemented > >>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>> stated in 2. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>> Jorge. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> El mar., 26 dic. 2017 a las 20:04, Matthias J. Sax (< > >>>>>>>>>>>>>>>> matthias@confluent.io>) > >>>>>>>>>>>>>>>>> escribi=C3=B3: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Thanks for the KIP Jorge, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> As Bill pointed out already, we should be careful with > >>> adding > >>>>>>>>>>>> new > >>>>>>>>>>>>>>>>>> overloads as this contradicts the work done via KIP-18= 2. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> This KIP also seems to be related to KIP-149 and > KIP-159. > >>> Are > >>>>>>>>>>>> you > >>>>>>>>>>>>>>> aware > >>>>>>>>>>>>>>>>>> of them? Both have quite long DISCUSS threads, but it > >> might > >>>>> be > >>>>>>>>>>>>> worth > >>>>>>>>>>>>>>>>>> browsing through them. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> A few further questions: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> - why do you want to add the headers to `KeyValue`? I > am > >>> not > >>>>>>>>>>>> sure > >>>>>>>>>>>>>> if > >>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>> should consider headers as optional metadata and add i= t > >> to > >>>>>>>>>>>>>>>>>> `RecordContext` similar to timestamp, offset, etc. onl= y > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> - You only include stateless single-record > >> transformations > >>>>> at > >>>>>>>>>>>> the > >>>>>>>>>>>>>> DSL > >>>>>>>>>>>>>>>>>> level. Do you suggest that all other operator just dro= p > >>>>>>>>>>> headers > >>>>>>>>>>>> on > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> floor? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> - Why do you only want to put headers into in-memory > and > >>>>>>>>>>> cache > >>>>>>>>>>>>> but > >>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>> RocksDB store? What do you mean by "pass through"? IMH= O, > >>> all > >>>>>>>>>>>>> stores > >>>>>>>>>>>>>>>>>> should behave the same at DSL level. > >>>>>>>>>>>>>>>>>> -> if we store the headers in the state stores, wha= t > >> is > >>>>> the > >>>>>>>>>>>>>> upgrade > >>>>>>>>>>>>>>>>>> path? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> - Why do we need to store record header in state in t= he > >>>>> first > >>>>>>>>>>>>>> place, > >>>>>>>>>>>>>>> if > >>>>>>>>>>>>>>>>>> we exclude stateful operator at DSL level? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> What is the motivation for the "border lines" you > choose? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On 12/21/17 8:18 AM, Bill Bejeck wrote: > >>>>>>>>>>>>>>>>>>> Jorge, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thanks for the KIP, I know this is a feature others i= n > >> the > >>>>>>>>>>>>>> community > >>>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>>>> been interested in getting into Kafka Streams. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> I took a quick pass over it, and I have one initial > >>>>> question. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> We recently reduced overloads with KIP-182, and in th= is > >>> KIP > >>>>>>>>>>> we > >>>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>>> increasing them again. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> I can see from the KIP why they are necessary, but I'= m > >>>>>>>>>>>> wondering > >>>>>>>>>>>>> if > >>>>>>>>>>>>>>>> there > >>>>>>>>>>>>>>>>>>> is something else we can do to cut down on the > overloads > >>>>>>>>>>>>>>> introduced. I > >>>>>>>>>>>>>>>>>>> don't have any sound suggestions ATM, so I'll have to > >>> think > >>>>>>>>>>>> about > >>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>> some > >>>>>>>>>>>>>>>>>>> more, but I wanted to put the thought out there. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>> Bill > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On Thu, Dec 21, 2017 at 9:06 AM, Jorge Esteban Quilca= te > >>>>>>>>>>> Otoya < > >>>>>>>>>>>>>>>>>>> quilcate.jorge@gmail.com> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I have created a KIP to add Record Headers support t= o > >>> Kafka > >>>>>>>>>>>>>> Streams > >>>>>>>>>>>>>>>> API: > >>>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/ > confluence/display/KAFKA/KIP- > >>>>>>>>>>>>>>>>>>>> 244%3A+Add+Record+Header+support+to+Kafka+Streams > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> The main goal is to be able to use headers to filter= , > >> map > >>>>>>>>>>> and > >>>>>>>>>>>>>>> process > >>>>>>>>>>>>>>>>>>>> records as streams. Stateful processing (joins, > >> windows) > >>>>> are > >>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>> considered. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Proposed changes/Draft: > >>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/compare/trunk...jeqo= : > >>>>>>>>>>>>>>> streams-headers > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Feedback and suggestions are more than welcome. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Jorge. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> -- > >>>>>>>>>>>>> -- Guozhang > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>>> > >>>> > >>>> > >>> > >>> > >> > > > > > > > > --=20 -- Guozhang --0000000000004f8ef4056bf6f608--