kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: KIP-244: Add Record Header support to Kafka Streams
Date Sat, 12 May 2018 00:02:01 GMT
Yeah I'm only talking about the DSL part (i.e. how stateful / stateless
operators default inheritance protocol would be promised) to be managed
with KIP-159.

For allowing users to override the default behavior in PAPI, that would be
in a different KIP.


Guozhang


On Fri, May 11, 2018 at 10:41 AM, Matthias J. Sax <matthias@confluent.io>
wrote:

> I am actually not sure about this. Because it's about the semantics at
> PAPI level, but KIP-159 targets the DSL, it might actually be better to
> have a separate KIP?
>
> -Matthias
>
> On 5/11/18 9:26 AM, Guozhang Wang wrote:
> > That's a good question. I think we can manage this in KIP-159. I will go
> > ahead and try to augment that KIP together with the original author
> Jeyhun.
> >
> >
> > Guozhang
> >
> > On Fri, May 11, 2018 at 12:45 AM, Jorge Esteban Quilcate Otoya <
> > quilcate.jorge@gmail.com> wrote:
> >
> >> Thanks Guozhang and Matthias! I do also agree with this way of handling
> >> headers inheritance. I will add them to the KIP doc.
> >>
> >>> We can discuss about extending the current protocol and how to enable
> >> users
> >>> override those rule, and how to expose them in the DSL layer in a
> future
> >>> KIP.
> >>
> >> About this, should this be managed on KIP-159 or a new one?
> >>
> >> El jue., 10 may. 2018 a las 17:46, Matthias J. Sax (<
> matthias@confluent.io
> >>> )
> >> escribió:
> >>
> >>> Thanks Guozhang! Sounds good to me!
> >>>
> >>> -Matthias
> >>>
> >>> On 5/10/18 7:55 AM, Guozhang Wang wrote:
> >>>> Thanks for your thoughts Matthias. I think if we do want to bring
> >> KIP-244
> >>>> into 2.0 then we need to keep its scope small and well defined. For
> >> that
> >>>> I'm proposing:
> >>>>
> >>>> 1. Make the inheritance implementation of headers consistent with what
> >> we
> >>>> had with other record context fields. I.e. pass through the record
> >>> context
> >>>> in `context.forward()`. Note that within a processor node, users can
> >>>> already manipulate the Headers with the given APIs, so at the time of
> >>>> forwarding, the library can just copy what-ever is left / updated to
> >> the
> >>>> next processor node.
> >>>>
> >>>> 2. In the sink node, where a record is being sent to the Kafka topic,
> >> we
> >>>> should consider the following:
> >>>>
> >>>> a. For sink topics, we will set the headers into the producer record.
> >>>> b. For repartition topics, we will the headers into the producer
> >> record.
> >>>> c. For changelog topics, we will drop the headers in the produce
> record
> >>>> since they will not be used in restoration and not stored in the state
> >>>> store either.
> >>>>
> >>>>
> >>>> We can discuss about extending the current protocol and how to enable
> >>> users
> >>>> override those rule, and how to expose them in the DSL layer in a
> >> future
> >>>> KIP.
> >>>>
> >>>>
> >>>>
> >>>> Guozhang
> >>>>
> >>>>
> >>>> On Mon, May 7, 2018 at 5:49 PM, Matthias J. Sax <
> matthias@confluent.io
> >>>
> >>>> wrote:
> >>>>
> >>>>> Guozhang,
> >>>>>
> >>>>> if you advocate to forward headers by default, it might be a better
> >>>>> default strategy do forward the headers for all operators (similar to
> >>>>> topic/partition/offset metadata). It's usually harder for users to
> >>>>> reason about different cases and thus I would prefer to have
> >> consistent
> >>>>> behavior, ie, only one default strategy instead of introducing
> >> different
> >>>>> cases.
> >>>>>
> >>>>> Btw: My argument about dropping headers by default only implies, that
> >>>>> users need to copy the headers explicitly to the output records in
> >> there
> >>>>> code of they want to inspect them later -- it does not imply that
> >>>>> headers cannot be forwarded downstream. (Not sure if this was clear).
> >>>>>
> >>>>> I am also ok with copying be default thought (for me, it's a 51/49
> >>>>> preference for dropping by default only).
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>> On 5/7/18 4:52 PM, Guozhang Wang wrote:
> >>>>>> Hi Matthias,
> >>>>>>
> >>>>>> My concern of setting `null` in all cases is that it would make
> >> headers
> >>>>> not
> >>>>>> very useful in KIP-244 then, because headers will only be available
> >> at
> >>>>> the
> >>>>>> source stream / table, but not in any of the following instances. In
> >>>>>> practice users may be more likely to look into the headers later in
> >> the
> >>>>>> pipeline. Personally I'd suggest we pass the headers for all
> >> stateless
> >>>>>> operators in DSL and everywhere in PAPI's context.forward(). For
> >>>>>> repartition topics and sink topics, we also set them in the produced
> >>>>>> records accordingly; for changelog topics, we do not set them since
> >>> they
> >>>>>> are not going to be used anywhere in the store.
> >>>>>>
> >>>>>>
> >>>>>> Guozhang
> >>>>>>
> >>>>>>
> >>>>>> On Sun, May 6, 2018 at 9:03 PM, Matthias J. Sax <
> >> matthias@confluent.io
> >>>>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> I agree, that we should not block this KIP if possible.
> >> Nevertheless,
> >>> we
> >>>>>>> should try to get a reasonable default strategy for inheriting the
> >>>>>>> headers so we don't need to change it later on.
> >>>>>>>
> >>>>>>> Let's see what other think. I still tend slightly to set to `null`
> >> by
> >>>>>>> default for all cases. If the default strategy is different for
> >>>>>>> different operators as you suggest, it might be confusion to users.
> >>>>>>> IMHO, the default behavior should be as simple as possible.
> >>>>>>>
> >>>>>>>
> >>>>>>> -Matthias
> >>>>>>>
> >>>>>>>
> >>>>>>> On 5/6/18 8:53 PM, Guozhang Wang wrote:
> >>>>>>>> Matthias, thanks for sharing your opinions in the inheritance
> >>> protocol
> >>>>> of
> >>>>>>>> the record context. I'm thinking maybe we should make this
> >> discussion
> >>>>> as
> >>>>>>> a
> >>>>>>>> separate KIP by itself? If yes, then KIP-244's scope would be
> >>> smaller,
> >>>>>>> and
> >>>>>>>> within KIP-244 we can have a simple inheritance rule that setting
> >> it
> >>> to
> >>>>>>>> null when 1) going through stateful operators and 2) sending to
> any
> >>>>>>> topics.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Guozhang
> >>>>>>>>
> >>>>>>>> On Sun, May 6, 2018 at 10:24 AM, Matthias J. Sax <
> >>>>> matthias@confluent.io>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Making the inheritance protocol a public contract seems
> reasonable
> >>> to
> >>>>>>> me.
> >>>>>>>>>
> >>>>>>>>> In the current implementation, all output records inherits the
> >>> offset,
> >>>>>>>>> timestamp, topic, and partition metadata from the input record.
> We
> >>>>>>>>> already added an API to change the timestamp explicitly for the
> >>> output
> >>>>>>>>> record thought.
> >>>>>>>>>
> >>>>>>>>> I think it make sense to keep the inheritance of offset, topic,
> >> and
> >>>>>>>>> partition. For headers, it's worth to discuss. I see arguments
> for
> >>> two
> >>>>>>>>> strategies: (1) inherit by default, (2) set `null` by default.
> >>>>>>>>> Independent of the default behavior, we should add an API to set
> >>>>> headers
> >>>>>>>>> for output records explicitly though (similar to the "set
> >> timestamp
> >>>>>>> API").
> >>>>>>>>>
> >>>>>>>>> From my point of view, timestamp/headers are a different
> >>>>>>>>> "class/category" of data/metadata than topic/partition/offset.
> For
> >>> the
> >>>>>>>>> first category, it makes sense to manipulate them and it's more
> >> than
> >>>>>>>>> "plain metadata"; especially the timestamp. For the second
> >> category
> >>> it
> >>>>>>>>> does not make sense to manipulate it, and to me
> >>> topic/partition/offset
> >>>>>>>>> is pure metadata only---strictly speaking, it's even questionable
> >> if
> >>>>>>>>> output records should have any value for topic/partition/offset
> in
> >>> the
> >>>>>>>>> first place, or if they should be `null`, because those
> attributes
> >>> do
> >>>>>>>>> only make sense for source records that are consumed from a topic
> >>>>>>>>> directly only. On the other hand, if we make this difference
> >>> explicit,
> >>>>>>>>> it might be useful information for the use to track the current
> >>>>>>>>> topic/partition/offset of the original source record.
> >>>>>>>>>
> >>>>>>>>> Furthermore, to me, timestamps and headers are somewhat
> different,
> >>>>> too.
> >>>>>>>>> For stream processing it's required that every record has a
> >>> timestamp;
> >>>>>>>>> thus, it make sense to inherit the input record timestamp by
> >> default
> >>>>> (a
> >>>>>>>>> timestamp is not really metadata but actually equally important
> to
> >>> key
> >>>>>>>>> and value from my point of view). Header however are optional,
> and
> >>>>> thus
> >>>>>>>>> inheriting them is not really required. It might be convenient
> >>> though:
> >>>>>>>>> for example, imagine a simple "filter-only" application -- it
> >> would
> >>> be
> >>>>>>>>> cumbersome for users to explicitly copy the headers from the
> input
> >>>>>>>>> records to the output records -- it seems to be unnecessary
> >>>>> boilerplate
> >>>>>>>>> code. On the other hand, for any other more complex use case,
> it's
> >>>>>>>>> questionable to inherit headers---note, that headers would be
> >>> written
> >>>>> to
> >>>>>>>>> the output topics increasing the size of the messages. Overall, I
> >> am
> >>>>> not
> >>>>>>>>> sure which default strategy might be the better one for headers.
> >> Is
> >>>>>>>>> there a convincing argument for either one of them? I slightly
> >> tend
> >>> to
> >>>>>>>>> think that using `null` as default might be better.
> >>>>>>>>>
> >>>>>>>>> Last, we could also make the default behavior configurable.
> >>> Something
> >>>>>>>>> like `inherit.record.headers=true/false` with default value
> >> "false".
> >>>>>>>>> This would allow people to opt-in for auto-header-inheritance.
> >> Just
> >>> an
> >>>>>>>>> idea I wanted to add to the discussion---not sure if it's a good
> >>> one.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> -Matthias
> >>>>>>>>>
> >>>>>>>>> On 5/4/18 3:13 PM, Guozhang Wang wrote:
> >>>>>>>>>> Hello Jorge,
> >>>>>>>>>>
> >>>>>>>>>>> Agree. Probably point 3 handles this. `Headers` been part of
> >>>>>>>>> `RecordContext`
> >>>>>>>>>> would be handled the same way as other attributes.
> >>>>>>>>>>
> >>>>>>>>>> Today we do not have a clear inheritance protocol for other
> >> fields
> >>> of
> >>>>>>>>>> RecordContext yet: although internally we do have some criterion
> >> on
> >>>>>>>>>> topic/partition/offset and timestamp, they are not explicitly
> >>> exposed
> >>>>>>> to
> >>>>>>>>>> users.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> I think we still need to have a defined protocol for headers
> >>> itself,
> >>>>>>> but
> >>>>>>>>> I
> >>>>>>>>>> agree that it better to be scoped out side of this KIP, since
> >> this
> >>>>>>>>>> inheritance protocol itself for all the fields of RecordContext
> >>> would
> >>>>>>>>>> better be a separate KIP. We can document this clearly in the
> >> wiki
> >>>>>>> page.
> >>>>>>>>>>
> >>>>>>>>>> Guozhang
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Fri, May 4, 2018 at 5:26 AM, Florian Garcia <
> >>>>>>>>>> garcia.florian.perso@gmail.com> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi,
> >>>>>>>>>>>
> >>>>>>>>>>> For me this is a great first step to have Headers in streaming.
> >>>>>>>>>>> My current use case is about distributed tracing (Zipkin) and
> >> with
> >>>>> the
> >>>>>>>>>>> headers in the processorContext() I'll be able to manage that
> >> for
> >>>>> the
> >>>>>>>>> most
> >>>>>>>>>>> cases.
> >>>>>>>>>>> The KIP-159 should follow after this but this is where all the
> >>> major
> >>>>>>>>>>> questions will arise for stateful operations (as Guozhang
> said).
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for the work on this Jorge.
> >>>>>>>>>>>
> >>>>>>>>>>> Le ven. 4 mai 2018 à 01:04, Jorge Esteban Quilcate Otoya <
> >>>>>>>>>>> quilcate.jorge@gmail.com> a écrit :
> >>>>>>>>>>>
> >>>>>>>>>>>> Thanks Guozhang and John for your feedback.
> >>>>>>>>>>>>
> >>>>>>>>>>>>> 1. We need to have a clear inheritance protocol of headers in
> >>> our
> >>>>>>>>>>>> topology:
> >>>>>>>>>>>>> 1.a. In PAPI's context.forward() call, it should be
> >>>>>>> straight-forward.
> >>>>>>>>>>>>> 1.b. In DSL stateless operators, it should be
> >> straight-forward.
> >>>>>>>>>>>>> 1.c. What about in stateful operators like aggregates and
> >> joins?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Agree. Probably point 3 handles this. `Headers` been part of
> >>>>>>>>>>>> `RecordContext` would be handled the same way as other
> >>> attributes.
> >>>>>>>>>>>>
> >>>>>>>>>>>>> 3. In future work "Adding DSL Processors to use Headers to
> >>>>>>>>>>>> filter/map/branch",
> >>>>>>>>>>>> it may well be covered in KIP-159; worth taking a look at that
> >>> KIP.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Yes, I will point to it.
> >>>>>>>>>>>>
> >>>>>>>>>>>>> 2. In terms of internal implementations, should the state
> >> store
> >>>>>>>>>>>> cache include the headers then in order to be sent
> downstreams?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Good question. As `LRUCacheEntry` extends `RecordContext`, I
> >>> thinks
> >>>>>>>>> this
> >>>>>>>>>>> is
> >>>>>>>>>>>> already supported. I will detail this on the KIP.
> >>>>>>>>>>>>
> >>>>>>>>>>>>> 4. MINOR: "void process(K key, V value, Headers headers)",
> >> this
> >>>>>>> should
> >>>>>>>>>>> be
> >>>>>>>>>>>> removed?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Fixed, thanks.
> >>>>>>>>>>>>
> >>>>>>>>>>>>> 5. MINOR: it seems to be the case that in this KIP, our scope
> >> is
> >>>>>>> only
> >>>>>>>>>>>> for exposing
> >>>>>>>>>>>> the headers for reading, and not allowing users to add /
> modify
> >>>>>>>>> headers,
> >>>>>>>>>>>> right? If yes, we'd better state it clearly at the "Proposed
> >>>>> Changes"
> >>>>>>>>>>>> section.
> >>>>>>>>>>>>
> >>>>>>>>>>>> As headers is exposed in the `ProcessContext`, and headers
> will
> >>> be
> >>>>>>> send
> >>>>>>>>>>>> downstream, it can be mutated (add/remove headers).
> >>>>>>>>>>>>
> >>>>>>>>>>>>  > Also, despite the decreased scope in this KIP, I think it
> >>> might
> >>>>> be
> >>>>>>>>>>>> valuable to define what will happen to headers once this
> change
> >>> is
> >>>>>>>>>>>> implemented. For example, I think a minimal groundwork-level
> >>> change
> >>>>>>>>> might
> >>>>>>>>>>>> be to make the API changes, while promising to drop all
> headers
> >>>>> from
> >>>>>>>>>>> input
> >>>>>>>>>>>> records.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I will suggest to pass headers to downstream nodes, and don't
> >>> drop
> >>>>>>>>> yhrm.
> >>>>>>>>>>>> Clients will have to drop `Headers` if they have used them.
> >>>>>>>>>>>> Or it could be something like a boolean config property that
> >>> manage
> >>>>>>>>> this.
> >>>>>>>>>>>> I would like to hear feedback here.
> >>>>>>>>>>>>
> >>>>>>>>>>>>> A maximal groundwork change would be to forward the headers
> >>>>> through
> >>>>>>>>> all
> >>>>>>>>>>>> operators
> >>>>>>>>>>>> in
> >>>>>>>>>>>>
> >>>>>>>>>>>> Streams. But I think there are some unresolved questions about
> >>>>>>>>>>> forwarding,
> >>>>>>>>>>>> like "what happens to the headers in a join?"
> >>>>>>>>>>>> Probably this would be solve once KIP-159 is implemented and
> >>>>>>> supporting
> >>>>>>>>>>>> Headers.
> >>>>>>>>>>>>
> >>>>>>>>>>>>> There's of course some middle ground, but instinctively, I
> >> think
> >>>>> I'd
> >>>>>>>>>>>> prefer to have a clear definition that headers are currently
> >>> *not*
> >>>>>>>>>>>> forwarded, rather than having a complex list of operators that
> >> do
> >>>>> or
> >>>>>>>>>>> don't
> >>>>>>>>>>>> forward them. Plus, I think it might be tricky to define this
> >>>>>>> behavior
> >>>>>>>>>>>> while not allowing the scope to return to that of your
> original
> >>>>>>>>> proposal!
> >>>>>>>>>>>>
> >>>>>>>>>>>> Agree. But `Headers` were forwarded *explicitly* in the
> >> original
> >>>>>>>>>>> proposal.
> >>>>>>>>>>>> The current one pass it as part of `RecordContext`, so if it's
> >>>>>>> forward
> >>>>>>>>> it
> >>>>>>>>>>>> or not is as the same as `RecordContext`.
> >>>>>>>>>>>> On top of this implementation, we can design how
> >> filter/map/join
> >>>>> will
> >>>>>>>>> be
> >>>>>>>>>>>> handled. Probably following KIP-159 approach.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Cheers,
> >>>>>>>>>>>> Jorge.
> >>>>>>>>>>>>
> >>>>>>>>>>>> El mié., 2 may. 2018 a las 22:56, Guozhang Wang (<
> >>>>> wangguoz@gmail.com
> >>>>>>>> )
> >>>>>>>>>>>> escribió:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Jorge,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for the written KIP! Made a pass over it and left some
> >>>>>>> comments
> >>>>>>>>>>>>> (some of them overlapped with John's):
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1. We need to have a clear inheritance protocol of headers in
> >>> our
> >>>>>>>>>>>> topology:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1.a. In PAPI's context.forward() call, it should be
> >>>>>>> straight-forward.
> >>>>>>>>>>>>> 1.b. In DSL stateless operators, it should be
> >> straight-forward.
> >>>>>>>>>>>>> 1.c. What about in stateful operators like aggregates and
> >> joins?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2. In terms of internal implementations, should the state
> >> store
> >>>>>>> cache
> >>>>>>>>>>>>> include the headers then in order to be sent downstreams?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 3. In future work "Adding DSL Processors to use Headers to
> >>>>>>>>>>>>> filter/map/branch", it may well be covered in KIP-159; worth
> >>>>> taking
> >>>>>>> a
> >>>>>>>>>>>> look
> >>>>>>>>>>>>> at that KIP.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 4. MINOR: "void process(K key, V value, Headers headers)",
> >> this
> >>>>>>> should
> >>>>>>>>>>> be
> >>>>>>>>>>>>> removed?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 5. MINOR: it seems to be the case that in this KIP, our scope
> >> is
> >>>>>>> only
> >>>>>>>>>>> for
> >>>>>>>>>>>>> exposing the headers for reading, and not allowing users to
> >> add
> >>> /
> >>>>>>>>>>> modify
> >>>>>>>>>>>>> headers, right? If yes, we'd better state it clearly at the
> >>>>>>> "Proposed
> >>>>>>>>>>>>> Changes" section.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Wed, May 2, 2018 at 8:42 AM, John Roesler <
> >> john@confluent.io
> >>>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi Jorge,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks for the design work.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I agree that de-scoping the work to just the Processor API
> >> will
> >>>>>>> help
> >>>>>>>>>>>>>> contain the design and implementation complexity.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> In the KIP, it mentions that the headers would be available
> >> in
> >>>>> the
> >>>>>>>>>>>>>> ProcessorContext, (like "context.headers()"). It also says
> >> that
> >>>>>>>>>>>>>> implementers would need to implement the method "void
> >> process(K
> >>>>>>> key,
> >>>>>>>>>>> V
> >>>>>>>>>>>>>> value, Headers headers);". I think maybe you meant to remove
> >>> the
> >>>>>>>>>>>> proposal
> >>>>>>>>>>>>>> to modify "process", since it wouldn't be necessary in
> >>>>> conjunction
> >>>>>>>>>>> with
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>> ProcessorContext change, and it's not represented in your
> PR.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Also, despite the decreased scope in this KIP, I think it
> >> might
> >>>>> be
> >>>>>>>>>>>>> valuable
> >>>>>>>>>>>>>> to define what will happen to headers once this change is
> >>>>>>>>>>> implemented.
> >>>>>>>>>>>>> For
> >>>>>>>>>>>>>> example, I think a minimal groundwork-level change might be
> >> to
> >>>>> make
> >>>>>>>>>>> the
> >>>>>>>>>>>>> API
> >>>>>>>>>>>>>> changes, while promising to drop all headers from input
> >>> records.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> A maximal groundwork change would be to forward the headers
> >>>>> through
> >>>>>>>>>>> all
> >>>>>>>>>>>>>> operators in Streams. But I think there are some unresolved
> >>>>>>> questions
> >>>>>>>>>>>>> about
> >>>>>>>>>>>>>> forwarding, like "what happens to the headers in a join?"
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> There's of course some middle ground, but instinctively, I
> >>> think
> >>>>>>> I'd
> >>>>>>>>>>>>> prefer
> >>>>>>>>>>>>>> to have a clear definition that headers are currently *not*
> >>>>>>>>>>> forwarded,
> >>>>>>>>>>>>>> rather than having a complex list of operators that do or
> >> don't
> >>>>>>>>>>> forward
> >>>>>>>>>>>>>> them. Plus, I think it might be tricky to define this
> >> behavior
> >>>>>>> while
> >>>>>>>>>>>> not
> >>>>>>>>>>>>>> allowing the scope to return to that of your original
> >> proposal!
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks again for the KIP,
> >>>>>>>>>>>>>> -John
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Wed, May 2, 2018 at 8:05 AM, Jorge Esteban Quilcate Otoya
> >> <
> >>>>>>>>>>>>>> quilcate.jorge@gmail.com> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi Matthias,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I've created a new JIRA to track this, updated the KIP and
> >>>>> create
> >>>>>>> a
> >>>>>>>>>>>> PR.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Looking forward to your feedback,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Jorge.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> El mar., 13 feb. 2018 a las 22:43, Matthias J. Sax (<
> >>>>>>>>>>>>>> matthias@confluent.io
> >>>>>>>>>>>>>>>> )
> >>>>>>>>>>>>>>> escribió:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi Jorge,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I would like to unblock this KIP to make some progress.
> The
> >>>>>>>>>>> tricky
> >>>>>>>>>>>>>>>> question of this work, seems to be how to expose headers
> at
> >>> DSL
> >>>>>>>>>>>>> level.
> >>>>>>>>>>>>>>>> This related to KIP-149 and KIP-159. However, for
> Processor
> >>>>> API,
> >>>>>>>>>>> it
> >>>>>>>>>>>>>>>> seems to be rather straight forward to add headers to the
> >>> API.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thus, I would suggest to de-scope this KIP and add header
> >>>>> support
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>>>> Processor API only as a first step. If this is done, we
> can
> >>> see
> >>>>>>>>>>> in
> >>>>>>>>>>>> a
> >>>>>>>>>>>>>>>> second step, how to add headers at DSL level.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> WDYT about this proposal?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> If you agree, please update the JIRA and KIP accordingly.
> >>> Note,
> >>>>>>>>>>>> that
> >>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>> have two JIRA that are duplicates atm. We can scope them
> >>>>>>>>>>>> accordingly:
> >>>>>>>>>>>>>>>> one for PAPI only, and second as a dependent JIRA for DSL.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On 12/30/17 3:11 PM, Jorge Esteban Quilcate Otoya wrote:
> >>>>>>>>>>>>>>>>> Thanks for your feedback!
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 1. I was adding headers to KeyValue to support groupBy,
> >> but
> >>> I
> >>>>>>>>>>>> think
> >>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>> not necessary. It should be enough with mapping headers
> to
> >>>>>>>>>>>>> key/value
> >>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>> then group using current KeyValue structure.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 2. Yes. IMO key/value stores, like RocksDB, rely on KV as
> >>>>>>>>>>>>> structure,
> >>>>>>>>>>>>>>>> hence
> >>>>>>>>>>>>>>>>> considering headers as part of stateful operations will
> >> not
> >>>>> fit
> >>>>>>>>>>>> in
> >>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>> approach and increase complexity (I cannot think in a
> >>> use-case
> >>>>>>>>>>>> that
> >>>>>>>>>>>>>>> need
> >>>>>>>>>>>>>>>>> this).
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 3. and 4. Changes on 1. will solve this issue.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Probably I rush a bit proposing this change, I was not
> >> aware
> >>>>> of
> >>>>>>>>>>>>>> KIP-159
> >>>>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>> KAFKA-5632.
> >>>>>>>>>>>>>>>>> If KIP-159 is adopted and we reduce this KIP to add
> >> Headers
> >>> to
> >>>>>>>>>>>>>>>>> RecordContext will be enough, but I'm not sure about the
> >>> scope
> >>>>>>>>>>> of
> >>>>>>>>>>>>>>>> KIP-159.
> >>>>>>>>>>>>>>>>> If it includes stateful operations will be difficult to
> >>>>>>>>>>>> implemented
> >>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>> stated in 2.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>> Jorge.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> El mar., 26 dic. 2017 a las 20:04, Matthias J. Sax (<
> >>>>>>>>>>>>>>>> matthias@confluent.io>)
> >>>>>>>>>>>>>>>>> escribió:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thanks for the KIP Jorge,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> As Bill pointed out already, we should be careful with
> >>> adding
> >>>>>>>>>>>> new
> >>>>>>>>>>>>>>>>>> overloads as this contradicts the work done via KIP-182.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> This KIP also seems to be related to KIP-149 and
> KIP-159.
> >>> Are
> >>>>>>>>>>>> you
> >>>>>>>>>>>>>>> aware
> >>>>>>>>>>>>>>>>>> of them? Both have quite long DISCUSS threads, but it
> >> might
> >>>>> be
> >>>>>>>>>>>>> worth
> >>>>>>>>>>>>>>>>>> browsing through them.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> A few further questions:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>  - why do you want to add the headers to `KeyValue`? I
> am
> >>> not
> >>>>>>>>>>>> sure
> >>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>> should consider headers as optional metadata and add it
> >> to
> >>>>>>>>>>>>>>>>>> `RecordContext` similar to timestamp, offset, etc. only
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>  - You only include stateless single-record
> >> transformations
> >>>>> at
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>> DSL
> >>>>>>>>>>>>>>>>>> level. Do you suggest that all other operator just drop
> >>>>>>>>>>> headers
> >>>>>>>>>>>> on
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> floor?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>  - Why do you only want to put headers into in-memory
> and
> >>>>>>>>>>> cache
> >>>>>>>>>>>>> but
> >>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>> RocksDB store? What do you mean by "pass through"? IMHO,
> >>> all
> >>>>>>>>>>>>> stores
> >>>>>>>>>>>>>>>>>> should behave the same at DSL level.
> >>>>>>>>>>>>>>>>>>    -> if we store the headers in the state stores, what
> >> is
> >>>>> the
> >>>>>>>>>>>>>> upgrade
> >>>>>>>>>>>>>>>>>> path?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>  - Why do we need to store record header in state in the
> >>>>> first
> >>>>>>>>>>>>>> place,
> >>>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>>>>> we exclude stateful operator at DSL level?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> What is the motivation for the "border lines" you
> choose?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On 12/21/17 8:18 AM, Bill Bejeck wrote:
> >>>>>>>>>>>>>>>>>>> Jorge,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thanks for the KIP, I know this is a feature others in
> >> the
> >>>>>>>>>>>>>> community
> >>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>> been interested in getting into Kafka Streams.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I took a quick pass over it, and I have one initial
> >>>>> question.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> We recently reduced overloads with KIP-182, and in this
> >>> KIP
> >>>>>>>>>>> we
> >>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>> increasing them again.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I can see from the KIP why they are necessary, but I'm
> >>>>>>>>>>>> wondering
> >>>>>>>>>>>>> if
> >>>>>>>>>>>>>>>> there
> >>>>>>>>>>>>>>>>>>> is something else we can do to cut down on the
> overloads
> >>>>>>>>>>>>>>> introduced.  I
> >>>>>>>>>>>>>>>>>>> don't have any sound suggestions ATM, so I'll have to
> >>> think
> >>>>>>>>>>>> about
> >>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>> some
> >>>>>>>>>>>>>>>>>>> more, but I wanted to put the thought out there.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>> Bill
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Thu, Dec 21, 2017 at 9:06 AM, Jorge Esteban Quilcate
> >>>>>>>>>>> Otoya <
> >>>>>>>>>>>>>>>>>>> quilcate.jorge@gmail.com> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I have created a KIP to add Record Headers support to
> >>> Kafka
> >>>>>>>>>>>>>> Streams
> >>>>>>>>>>>>>>>> API:
> >>>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/
> confluence/display/KAFKA/KIP-
> >>>>>>>>>>>>>>>>>>>> 244%3A+Add+Record+Header+support+to+Kafka+Streams
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> The main goal is to be able to use headers to filter,
> >> map
> >>>>>>>>>>> and
> >>>>>>>>>>>>>>> process
> >>>>>>>>>>>>>>>>>>>> records as streams. Stateful processing (joins,
> >> windows)
> >>>>> are
> >>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>> considered.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Proposed changes/Draft:
> >>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/compare/trunk...jeqo:
> >>>>>>>>>>>>>>> streams-headers
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Feedback and suggestions are more than welcome.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Jorge.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> --
> >>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>>
> >>
> >
> >
> >
>
>


-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message