kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jorge Esteban Quilcate Otoya <quilcate.jo...@gmail.com>
Subject Re: KIP-244: Add Record Header support to Kafka Streams
Date Mon, 14 May 2018 16:20:16 GMT
Yes, I've one already created: https://github.com/apache/kafka/pull/4955

On Mon, 14 May 2018, 17:55 Guozhang Wang, <wangguoz@gmail.com> wrote:

> Thanks Jorge, that sounds good to me.
>
> Also please feel free to send out the PR for reviews while the KIP is being
> voted on.
>
>
> Guozhang
>
>
> On Mon, May 14, 2018 at 8:31 AM, Jorge Esteban Quilcate Otoya <
> quilcate.jorge@gmail.com> wrote:
>
> > Thanks for your feedback everyone!
> >
> > If there is no more comments on this KIP, I think we can open the VOTE
> > thread.
> >
> > Cheers,
> > Jorge.
> >
> > El sáb., 12 may. 2018 a las 2:02, Guozhang Wang (<wangguoz@gmail.com>)
> > escribió:
> >
> > > Yeah I'm only talking about the DSL part (i.e. how stateful / stateless
> > > operators default inheritance protocol would be promised) to be managed
> > > with KIP-159.
> > >
> > > For allowing users to override the default behavior in PAPI, that would
> > be
> > > in a different KIP.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, May 11, 2018 at 10:41 AM, Matthias J. Sax <
> matthias@confluent.io
> > >
> > > wrote:
> > >
> > > > I am actually not sure about this. Because it's about the semantics
> at
> > > > PAPI level, but KIP-159 targets the DSL, it might actually be better
> to
> > > > have a separate KIP?
> > > >
> > > > -Matthias
> > > >
> > > > On 5/11/18 9:26 AM, Guozhang Wang wrote:
> > > > > That's a good question. I think we can manage this in KIP-159. I
> will
> > > go
> > > > > ahead and try to augment that KIP together with the original author
> > > > Jeyhun.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Fri, May 11, 2018 at 12:45 AM, Jorge Esteban Quilcate Otoya <
> > > > > quilcate.jorge@gmail.com> wrote:
> > > > >
> > > > >> Thanks Guozhang and Matthias! I do also agree with this way of
> > > handling
> > > > >> headers inheritance. I will add them to the KIP doc.
> > > > >>
> > > > >>> We can discuss about extending the current protocol and how to
> > enable
> > > > >> users
> > > > >>> override those rule, and how to expose them in the DSL layer in a
> > > > future
> > > > >>> KIP.
> > > > >>
> > > > >> About this, should this be managed on KIP-159 or a new one?
> > > > >>
> > > > >> El jue., 10 may. 2018 a las 17:46, Matthias J. Sax (<
> > > > matthias@confluent.io
> > > > >>> )
> > > > >> escribió:
> > > > >>
> > > > >>> Thanks Guozhang! Sounds good to me!
> > > > >>>
> > > > >>> -Matthias
> > > > >>>
> > > > >>> On 5/10/18 7:55 AM, Guozhang Wang wrote:
> > > > >>>> Thanks for your thoughts Matthias. I think if we do want to
> bring
> > > > >> KIP-244
> > > > >>>> into 2.0 then we need to keep its scope small and well defined.
> > For
> > > > >> that
> > > > >>>> I'm proposing:
> > > > >>>>
> > > > >>>> 1. Make the inheritance implementation of headers consistent
> with
> > > what
> > > > >> we
> > > > >>>> had with other record context fields. I.e. pass through the
> record
> > > > >>> context
> > > > >>>> in `context.forward()`. Note that within a processor node, users
> > can
> > > > >>>> already manipulate the Headers with the given APIs, so at the
> time
> > > of
> > > > >>>> forwarding, the library can just copy what-ever is left /
> updated
> > to
> > > > >> the
> > > > >>>> next processor node.
> > > > >>>>
> > > > >>>> 2. In the sink node, where a record is being sent to the Kafka
> > > topic,
> > > > >> we
> > > > >>>> should consider the following:
> > > > >>>>
> > > > >>>> a. For sink topics, we will set the headers into the producer
> > > record.
> > > > >>>> b. For repartition topics, we will the headers into the producer
> > > > >> record.
> > > > >>>> c. For changelog topics, we will drop the headers in the produce
> > > > record
> > > > >>>> since they will not be used in restoration and not stored in the
> > > state
> > > > >>>> store either.
> > > > >>>>
> > > > >>>>
> > > > >>>> We can discuss about extending the current protocol and how to
> > > enable
> > > > >>> users
> > > > >>>> override those rule, and how to expose them in the DSL layer in
> a
> > > > >> future
> > > > >>>> KIP.
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>> Guozhang
> > > > >>>>
> > > > >>>>
> > > > >>>> On Mon, May 7, 2018 at 5:49 PM, Matthias J. Sax <
> > > > matthias@confluent.io
> > > > >>>
> > > > >>>> wrote:
> > > > >>>>
> > > > >>>>> Guozhang,
> > > > >>>>>
> > > > >>>>> if you advocate to forward headers by default, it might be a
> > better
> > > > >>>>> default strategy do forward the headers for all operators
> > (similar
> > > to
> > > > >>>>> topic/partition/offset metadata). It's usually harder for users
> > to
> > > > >>>>> reason about different cases and thus I would prefer to have
> > > > >> consistent
> > > > >>>>> behavior, ie, only one default strategy instead of introducing
> > > > >> different
> > > > >>>>> cases.
> > > > >>>>>
> > > > >>>>> Btw: My argument about dropping headers by default only
> implies,
> > > that
> > > > >>>>> users need to copy the headers explicitly to the output records
> > in
> > > > >> there
> > > > >>>>> code of they want to inspect them later -- it does not imply
> that
> > > > >>>>> headers cannot be forwarded downstream. (Not sure if this was
> > > clear).
> > > > >>>>>
> > > > >>>>> I am also ok with copying be default thought (for me, it's a
> > 51/49
> > > > >>>>> preference for dropping by default only).
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> -Matthias
> > > > >>>>>
> > > > >>>>> On 5/7/18 4:52 PM, Guozhang Wang wrote:
> > > > >>>>>> Hi Matthias,
> > > > >>>>>>
> > > > >>>>>> My concern of setting `null` in all cases is that it would
> make
> > > > >> headers
> > > > >>>>> not
> > > > >>>>>> very useful in KIP-244 then, because headers will only be
> > > available
> > > > >> at
> > > > >>>>> the
> > > > >>>>>> source stream / table, but not in any of the following
> > instances.
> > > In
> > > > >>>>>> practice users may be more likely to look into the headers
> later
> > > in
> > > > >> the
> > > > >>>>>> pipeline. Personally I'd suggest we pass the headers for all
> > > > >> stateless
> > > > >>>>>> operators in DSL and everywhere in PAPI's context.forward().
> For
> > > > >>>>>> repartition topics and sink topics, we also set them in the
> > > produced
> > > > >>>>>> records accordingly; for changelog topics, we do not set them
> > > since
> > > > >>> they
> > > > >>>>>> are not going to be used anywhere in the store.
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> Guozhang
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> On Sun, May 6, 2018 at 9:03 PM, Matthias J. Sax <
> > > > >> matthias@confluent.io
> > > > >>>>
> > > > >>>>>> wrote:
> > > > >>>>>>
> > > > >>>>>>> I agree, that we should not block this KIP if possible.
> > > > >> Nevertheless,
> > > > >>> we
> > > > >>>>>>> should try to get a reasonable default strategy for
> inheriting
> > > the
> > > > >>>>>>> headers so we don't need to change it later on.
> > > > >>>>>>>
> > > > >>>>>>> Let's see what other think. I still tend slightly to set to
> > > `null`
> > > > >> by
> > > > >>>>>>> default for all cases. If the default strategy is different
> for
> > > > >>>>>>> different operators as you suggest, it might be confusion to
> > > users.
> > > > >>>>>>> IMHO, the default behavior should be as simple as possible.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> -Matthias
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> On 5/6/18 8:53 PM, Guozhang Wang wrote:
> > > > >>>>>>>> Matthias, thanks for sharing your opinions in the
> inheritance
> > > > >>> protocol
> > > > >>>>> of
> > > > >>>>>>>> the record context. I'm thinking maybe we should make this
> > > > >> discussion
> > > > >>>>> as
> > > > >>>>>>> a
> > > > >>>>>>>> separate KIP by itself? If yes, then KIP-244's scope would
> be
> > > > >>> smaller,
> > > > >>>>>>> and
> > > > >>>>>>>> within KIP-244 we can have a simple inheritance rule that
> > > setting
> > > > >> it
> > > > >>> to
> > > > >>>>>>>> null when 1) going through stateful operators and 2) sending
> > to
> > > > any
> > > > >>>>>>> topics.
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> Guozhang
> > > > >>>>>>>>
> > > > >>>>>>>> On Sun, May 6, 2018 at 10:24 AM, Matthias J. Sax <
> > > > >>>>> matthias@confluent.io>
> > > > >>>>>>>> wrote:
> > > > >>>>>>>>
> > > > >>>>>>>>> Making the inheritance protocol a public contract seems
> > > > reasonable
> > > > >>> to
> > > > >>>>>>> me.
> > > > >>>>>>>>>
> > > > >>>>>>>>> In the current implementation, all output records inherits
> > the
> > > > >>> offset,
> > > > >>>>>>>>> timestamp, topic, and partition metadata from the input
> > record.
> > > > We
> > > > >>>>>>>>> already added an API to change the timestamp explicitly for
> > the
> > > > >>> output
> > > > >>>>>>>>> record thought.
> > > > >>>>>>>>>
> > > > >>>>>>>>> I think it make sense to keep the inheritance of offset,
> > topic,
> > > > >> and
> > > > >>>>>>>>> partition. For headers, it's worth to discuss. I see
> > arguments
> > > > for
> > > > >>> two
> > > > >>>>>>>>> strategies: (1) inherit by default, (2) set `null` by
> > default.
> > > > >>>>>>>>> Independent of the default behavior, we should add an API
> to
> > > set
> > > > >>>>> headers
> > > > >>>>>>>>> for output records explicitly though (similar to the "set
> > > > >> timestamp
> > > > >>>>>>> API").
> > > > >>>>>>>>>
> > > > >>>>>>>>> From my point of view, timestamp/headers are a different
> > > > >>>>>>>>> "class/category" of data/metadata than
> > topic/partition/offset.
> > > > For
> > > > >>> the
> > > > >>>>>>>>> first category, it makes sense to manipulate them and it's
> > more
> > > > >> than
> > > > >>>>>>>>> "plain metadata"; especially the timestamp. For the second
> > > > >> category
> > > > >>> it
> > > > >>>>>>>>> does not make sense to manipulate it, and to me
> > > > >>> topic/partition/offset
> > > > >>>>>>>>> is pure metadata only---strictly speaking, it's even
> > > questionable
> > > > >> if
> > > > >>>>>>>>> output records should have any value for
> > topic/partition/offset
> > > > in
> > > > >>> the
> > > > >>>>>>>>> first place, or if they should be `null`, because those
> > > > attributes
> > > > >>> do
> > > > >>>>>>>>> only make sense for source records that are consumed from a
> > > topic
> > > > >>>>>>>>> directly only. On the other hand, if we make this
> difference
> > > > >>> explicit,
> > > > >>>>>>>>> it might be useful information for the use to track the
> > current
> > > > >>>>>>>>> topic/partition/offset of the original source record.
> > > > >>>>>>>>>
> > > > >>>>>>>>> Furthermore, to me, timestamps and headers are somewhat
> > > > different,
> > > > >>>>> too.
> > > > >>>>>>>>> For stream processing it's required that every record has a
> > > > >>> timestamp;
> > > > >>>>>>>>> thus, it make sense to inherit the input record timestamp
> by
> > > > >> default
> > > > >>>>> (a
> > > > >>>>>>>>> timestamp is not really metadata but actually equally
> > important
> > > > to
> > > > >>> key
> > > > >>>>>>>>> and value from my point of view). Header however are
> > optional,
> > > > and
> > > > >>>>> thus
> > > > >>>>>>>>> inheriting them is not really required. It might be
> > convenient
> > > > >>> though:
> > > > >>>>>>>>> for example, imagine a simple "filter-only" application --
> it
> > > > >> would
> > > > >>> be
> > > > >>>>>>>>> cumbersome for users to explicitly copy the headers from
> the
> > > > input
> > > > >>>>>>>>> records to the output records -- it seems to be unnecessary
> > > > >>>>> boilerplate
> > > > >>>>>>>>> code. On the other hand, for any other more complex use
> case,
> > > > it's
> > > > >>>>>>>>> questionable to inherit headers---note, that headers would
> be
> > > > >>> written
> > > > >>>>> to
> > > > >>>>>>>>> the output topics increasing the size of the messages.
> > > Overall, I
> > > > >> am
> > > > >>>>> not
> > > > >>>>>>>>> sure which default strategy might be the better one for
> > > headers.
> > > > >> Is
> > > > >>>>>>>>> there a convincing argument for either one of them? I
> > slightly
> > > > >> tend
> > > > >>> to
> > > > >>>>>>>>> think that using `null` as default might be better.
> > > > >>>>>>>>>
> > > > >>>>>>>>> Last, we could also make the default behavior configurable.
> > > > >>> Something
> > > > >>>>>>>>> like `inherit.record.headers=true/false` with default value
> > > > >> "false".
> > > > >>>>>>>>> This would allow people to opt-in for
> > auto-header-inheritance.
> > > > >> Just
> > > > >>> an
> > > > >>>>>>>>> idea I wanted to add to the discussion---not sure if it's a
> > > good
> > > > >>> one.
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>> -Matthias
> > > > >>>>>>>>>
> > > > >>>>>>>>> On 5/4/18 3:13 PM, Guozhang Wang wrote:
> > > > >>>>>>>>>> Hello Jorge,
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> Agree. Probably point 3 handles this. `Headers` been part
> > of
> > > > >>>>>>>>> `RecordContext`
> > > > >>>>>>>>>> would be handled the same way as other attributes.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Today we do not have a clear inheritance protocol for
> other
> > > > >> fields
> > > > >>> of
> > > > >>>>>>>>>> RecordContext yet: although internally we do have some
> > > criterion
> > > > >> on
> > > > >>>>>>>>>> topic/partition/offset and timestamp, they are not
> > explicitly
> > > > >>> exposed
> > > > >>>>>>> to
> > > > >>>>>>>>>> users.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> I think we still need to have a defined protocol for
> headers
> > > > >>> itself,
> > > > >>>>>>> but
> > > > >>>>>>>>> I
> > > > >>>>>>>>>> agree that it better to be scoped out side of this KIP,
> > since
> > > > >> this
> > > > >>>>>>>>>> inheritance protocol itself for all the fields of
> > > RecordContext
> > > > >>> would
> > > > >>>>>>>>>> better be a separate KIP. We can document this clearly in
> > the
> > > > >> wiki
> > > > >>>>>>> page.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Guozhang
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> On Fri, May 4, 2018 at 5:26 AM, Florian Garcia <
> > > > >>>>>>>>>> garcia.florian.perso@gmail.com> wrote:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> Hi,
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> For me this is a great first step to have Headers in
> > > streaming.
> > > > >>>>>>>>>>> My current use case is about distributed tracing (Zipkin)
> > and
> > > > >> with
> > > > >>>>> the
> > > > >>>>>>>>>>> headers in the processorContext() I'll be able to manage
> > that
> > > > >> for
> > > > >>>>> the
> > > > >>>>>>>>> most
> > > > >>>>>>>>>>> cases.
> > > > >>>>>>>>>>> The KIP-159 should follow after this but this is where
> all
> > > the
> > > > >>> major
> > > > >>>>>>>>>>> questions will arise for stateful operations (as Guozhang
> > > > said).
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Thanks for the work on this Jorge.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Le ven. 4 mai 2018 à 01:04, Jorge Esteban Quilcate Otoya
> <
> > > > >>>>>>>>>>> quilcate.jorge@gmail.com> a écrit :
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>> Thanks Guozhang and John for your feedback.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>> 1. We need to have a clear inheritance protocol of
> > headers
> > > in
> > > > >>> our
> > > > >>>>>>>>>>>> topology:
> > > > >>>>>>>>>>>>> 1.a. In PAPI's context.forward() call, it should be
> > > > >>>>>>> straight-forward.
> > > > >>>>>>>>>>>>> 1.b. In DSL stateless operators, it should be
> > > > >> straight-forward.
> > > > >>>>>>>>>>>>> 1.c. What about in stateful operators like aggregates
> and
> > > > >> joins?
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Agree. Probably point 3 handles this. `Headers` been
> part
> > of
> > > > >>>>>>>>>>>> `RecordContext` would be handled the same way as other
> > > > >>> attributes.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>> 3. In future work "Adding DSL Processors to use Headers
> > to
> > > > >>>>>>>>>>>> filter/map/branch",
> > > > >>>>>>>>>>>> it may well be covered in KIP-159; worth taking a look
> at
> > > that
> > > > >>> KIP.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Yes, I will point to it.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>> 2. In terms of internal implementations, should the
> state
> > > > >> store
> > > > >>>>>>>>>>>> cache include the headers then in order to be sent
> > > > downstreams?
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Good question. As `LRUCacheEntry` extends
> > `RecordContext`, I
> > > > >>> thinks
> > > > >>>>>>>>> this
> > > > >>>>>>>>>>> is
> > > > >>>>>>>>>>>> already supported. I will detail this on the KIP.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>> 4. MINOR: "void process(K key, V value, Headers
> > headers)",
> > > > >> this
> > > > >>>>>>> should
> > > > >>>>>>>>>>> be
> > > > >>>>>>>>>>>> removed?
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Fixed, thanks.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>> 5. MINOR: it seems to be the case that in this KIP, our
> > > scope
> > > > >> is
> > > > >>>>>>> only
> > > > >>>>>>>>>>>> for exposing
> > > > >>>>>>>>>>>> the headers for reading, and not allowing users to add /
> > > > modify
> > > > >>>>>>>>> headers,
> > > > >>>>>>>>>>>> right? If yes, we'd better state it clearly at the
> > "Proposed
> > > > >>>>> Changes"
> > > > >>>>>>>>>>>> section.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> As headers is exposed in the `ProcessContext`, and
> headers
> > > > will
> > > > >>> be
> > > > >>>>>>> send
> > > > >>>>>>>>>>>> downstream, it can be mutated (add/remove headers).
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>  > Also, despite the decreased scope in this KIP, I
> think
> > it
> > > > >>> might
> > > > >>>>> be
> > > > >>>>>>>>>>>> valuable to define what will happen to headers once this
> > > > change
> > > > >>> is
> > > > >>>>>>>>>>>> implemented. For example, I think a minimal
> > groundwork-level
> > > > >>> change
> > > > >>>>>>>>> might
> > > > >>>>>>>>>>>> be to make the API changes, while promising to drop all
> > > > headers
> > > > >>>>> from
> > > > >>>>>>>>>>> input
> > > > >>>>>>>>>>>> records.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> I will suggest to pass headers to downstream nodes, and
> > > don't
> > > > >>> drop
> > > > >>>>>>>>> yhrm.
> > > > >>>>>>>>>>>> Clients will have to drop `Headers` if they have used
> > them.
> > > > >>>>>>>>>>>> Or it could be something like a boolean config property
> > that
> > > > >>> manage
> > > > >>>>>>>>> this.
> > > > >>>>>>>>>>>> I would like to hear feedback here.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>> A maximal groundwork change would be to forward the
> > headers
> > > > >>>>> through
> > > > >>>>>>>>> all
> > > > >>>>>>>>>>>> operators
> > > > >>>>>>>>>>>> in
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Streams. But I think there are some unresolved questions
> > > about
> > > > >>>>>>>>>>> forwarding,
> > > > >>>>>>>>>>>> like "what happens to the headers in a join?"
> > > > >>>>>>>>>>>> Probably this would be solve once KIP-159 is implemented
> > and
> > > > >>>>>>> supporting
> > > > >>>>>>>>>>>> Headers.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>> There's of course some middle ground, but
> instinctively,
> > I
> > > > >> think
> > > > >>>>> I'd
> > > > >>>>>>>>>>>> prefer to have a clear definition that headers are
> > currently
> > > > >>> *not*
> > > > >>>>>>>>>>>> forwarded, rather than having a complex list of
> operators
> > > that
> > > > >> do
> > > > >>>>> or
> > > > >>>>>>>>>>> don't
> > > > >>>>>>>>>>>> forward them. Plus, I think it might be tricky to define
> > > this
> > > > >>>>>>> behavior
> > > > >>>>>>>>>>>> while not allowing the scope to return to that of your
> > > > original
> > > > >>>>>>>>> proposal!
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Agree. But `Headers` were forwarded *explicitly* in the
> > > > >> original
> > > > >>>>>>>>>>> proposal.
> > > > >>>>>>>>>>>> The current one pass it as part of `RecordContext`, so
> if
> > > it's
> > > > >>>>>>> forward
> > > > >>>>>>>>> it
> > > > >>>>>>>>>>>> or not is as the same as `RecordContext`.
> > > > >>>>>>>>>>>> On top of this implementation, we can design how
> > > > >> filter/map/join
> > > > >>>>> will
> > > > >>>>>>>>> be
> > > > >>>>>>>>>>>> handled. Probably following KIP-159 approach.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Cheers,
> > > > >>>>>>>>>>>> Jorge.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> El mié., 2 may. 2018 a las 22:56, Guozhang Wang (<
> > > > >>>>> wangguoz@gmail.com
> > > > >>>>>>>> )
> > > > >>>>>>>>>>>> escribió:
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Hi Jorge,
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Thanks for the written KIP! Made a pass over it and
> left
> > > some
> > > > >>>>>>> comments
> > > > >>>>>>>>>>>>> (some of them overlapped with John's):
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> 1. We need to have a clear inheritance protocol of
> > headers
> > > in
> > > > >>> our
> > > > >>>>>>>>>>>> topology:
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> 1.a. In PAPI's context.forward() call, it should be
> > > > >>>>>>> straight-forward.
> > > > >>>>>>>>>>>>> 1.b. In DSL stateless operators, it should be
> > > > >> straight-forward.
> > > > >>>>>>>>>>>>> 1.c. What about in stateful operators like aggregates
> and
> > > > >> joins?
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> 2. In terms of internal implementations, should the
> state
> > > > >> store
> > > > >>>>>>> cache
> > > > >>>>>>>>>>>>> include the headers then in order to be sent
> downstreams?
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> 3. In future work "Adding DSL Processors to use Headers
> > to
> > > > >>>>>>>>>>>>> filter/map/branch", it may well be covered in KIP-159;
> > > worth
> > > > >>>>> taking
> > > > >>>>>>> a
> > > > >>>>>>>>>>>> look
> > > > >>>>>>>>>>>>> at that KIP.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> 4. MINOR: "void process(K key, V value, Headers
> > headers)",
> > > > >> this
> > > > >>>>>>> should
> > > > >>>>>>>>>>> be
> > > > >>>>>>>>>>>>> removed?
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> 5. MINOR: it seems to be the case that in this KIP, our
> > > scope
> > > > >> is
> > > > >>>>>>> only
> > > > >>>>>>>>>>> for
> > > > >>>>>>>>>>>>> exposing the headers for reading, and not allowing
> users
> > to
> > > > >> add
> > > > >>> /
> > > > >>>>>>>>>>> modify
> > > > >>>>>>>>>>>>> headers, right? If yes, we'd better state it clearly at
> > the
> > > > >>>>>>> "Proposed
> > > > >>>>>>>>>>>>> Changes" section.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Guozhang
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> On Wed, May 2, 2018 at 8:42 AM, John Roesler <
> > > > >> john@confluent.io
> > > > >>>>
> > > > >>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Hi Jorge,
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Thanks for the design work.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> I agree that de-scoping the work to just the Processor
> > API
> > > > >> will
> > > > >>>>>>> help
> > > > >>>>>>>>>>>>>> contain the design and implementation complexity.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> In the KIP, it mentions that the headers would be
> > > available
> > > > >> in
> > > > >>>>> the
> > > > >>>>>>>>>>>>>> ProcessorContext, (like "context.headers()"). It also
> > says
> > > > >> that
> > > > >>>>>>>>>>>>>> implementers would need to implement the method "void
> > > > >> process(K
> > > > >>>>>>> key,
> > > > >>>>>>>>>>> V
> > > > >>>>>>>>>>>>>> value, Headers headers);". I think maybe you meant to
> > > remove
> > > > >>> the
> > > > >>>>>>>>>>>> proposal
> > > > >>>>>>>>>>>>>> to modify "process", since it wouldn't be necessary in
> > > > >>>>> conjunction
> > > > >>>>>>>>>>> with
> > > > >>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>> ProcessorContext change, and it's not represented in
> > your
> > > > PR.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Also, despite the decreased scope in this KIP, I think
> > it
> > > > >> might
> > > > >>>>> be
> > > > >>>>>>>>>>>>> valuable
> > > > >>>>>>>>>>>>>> to define what will happen to headers once this change
> > is
> > > > >>>>>>>>>>> implemented.
> > > > >>>>>>>>>>>>> For
> > > > >>>>>>>>>>>>>> example, I think a minimal groundwork-level change
> might
> > > be
> > > > >> to
> > > > >>>>> make
> > > > >>>>>>>>>>> the
> > > > >>>>>>>>>>>>> API
> > > > >>>>>>>>>>>>>> changes, while promising to drop all headers from
> input
> > > > >>> records.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> A maximal groundwork change would be to forward the
> > > headers
> > > > >>>>> through
> > > > >>>>>>>>>>> all
> > > > >>>>>>>>>>>>>> operators in Streams. But I think there are some
> > > unresolved
> > > > >>>>>>> questions
> > > > >>>>>>>>>>>>> about
> > > > >>>>>>>>>>>>>> forwarding, like "what happens to the headers in a
> > join?"
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> There's of course some middle ground, but
> > instinctively, I
> > > > >>> think
> > > > >>>>>>> I'd
> > > > >>>>>>>>>>>>> prefer
> > > > >>>>>>>>>>>>>> to have a clear definition that headers are currently
> > > *not*
> > > > >>>>>>>>>>> forwarded,
> > > > >>>>>>>>>>>>>> rather than having a complex list of operators that do
> > or
> > > > >> don't
> > > > >>>>>>>>>>> forward
> > > > >>>>>>>>>>>>>> them. Plus, I think it might be tricky to define this
> > > > >> behavior
> > > > >>>>>>> while
> > > > >>>>>>>>>>>> not
> > > > >>>>>>>>>>>>>> allowing the scope to return to that of your original
> > > > >> proposal!
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Thanks again for the KIP,
> > > > >>>>>>>>>>>>>> -John
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> On Wed, May 2, 2018 at 8:05 AM, Jorge Esteban Quilcate
> > > Otoya
> > > > >> <
> > > > >>>>>>>>>>>>>> quilcate.jorge@gmail.com> wrote:
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Hi Matthias,
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> I've created a new JIRA to track this, updated the
> KIP
> > > and
> > > > >>>>> create
> > > > >>>>>>> a
> > > > >>>>>>>>>>>> PR.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Looking forward to your feedback,
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Jorge.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> El mar., 13 feb. 2018 a las 22:43, Matthias J. Sax (<
> > > > >>>>>>>>>>>>>> matthias@confluent.io
> > > > >>>>>>>>>>>>>>>> )
> > > > >>>>>>>>>>>>>>> escribió:
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Hi Jorge,
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> I would like to unblock this KIP to make some
> > progress.
> > > > The
> > > > >>>>>>>>>>> tricky
> > > > >>>>>>>>>>>>>>>> question of this work, seems to be how to expose
> > headers
> > > > at
> > > > >>> DSL
> > > > >>>>>>>>>>>>> level.
> > > > >>>>>>>>>>>>>>>> This related to KIP-149 and KIP-159. However, for
> > > > Processor
> > > > >>>>> API,
> > > > >>>>>>>>>>> it
> > > > >>>>>>>>>>>>>>>> seems to be rather straight forward to add headers
> to
> > > the
> > > > >>> API.
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Thus, I would suggest to de-scope this KIP and add
> > > header
> > > > >>>>> support
> > > > >>>>>>>>>>>> for
> > > > >>>>>>>>>>>>>>>> Processor API only as a first step. If this is done,
> > we
> > > > can
> > > > >>> see
> > > > >>>>>>>>>>> in
> > > > >>>>>>>>>>>> a
> > > > >>>>>>>>>>>>>>>> second step, how to add headers at DSL level.
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> WDYT about this proposal?
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> If you agree, please update the JIRA and KIP
> > > accordingly.
> > > > >>> Note,
> > > > >>>>>>>>>>>> that
> > > > >>>>>>>>>>>>> we
> > > > >>>>>>>>>>>>>>>> have two JIRA that are duplicates atm. We can scope
> > them
> > > > >>>>>>>>>>>> accordingly:
> > > > >>>>>>>>>>>>>>>> one for PAPI only, and second as a dependent JIRA
> for
> > > DSL.
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> -Matthias
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> On 12/30/17 3:11 PM, Jorge Esteban Quilcate Otoya
> > wrote:
> > > > >>>>>>>>>>>>>>>>> Thanks for your feedback!
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> 1. I was adding headers to KeyValue to support
> > groupBy,
> > > > >> but
> > > > >>> I
> > > > >>>>>>>>>>>> think
> > > > >>>>>>>>>>>>>> it
> > > > >>>>>>>>>>>>>>> is
> > > > >>>>>>>>>>>>>>>>> not necessary. It should be enough with mapping
> > headers
> > > > to
> > > > >>>>>>>>>>>>> key/value
> > > > >>>>>>>>>>>>>>> and
> > > > >>>>>>>>>>>>>>>>> then group using current KeyValue structure.
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> 2. Yes. IMO key/value stores, like RocksDB, rely on
> > KV
> > > as
> > > > >>>>>>>>>>>>> structure,
> > > > >>>>>>>>>>>>>>>> hence
> > > > >>>>>>>>>>>>>>>>> considering headers as part of stateful operations
> > will
> > > > >> not
> > > > >>>>> fit
> > > > >>>>>>>>>>>> in
> > > > >>>>>>>>>>>>>> this
> > > > >>>>>>>>>>>>>>>>> approach and increase complexity (I cannot think
> in a
> > > > >>> use-case
> > > > >>>>>>>>>>>> that
> > > > >>>>>>>>>>>>>>> need
> > > > >>>>>>>>>>>>>>>>> this).
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> 3. and 4. Changes on 1. will solve this issue.
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> Probably I rush a bit proposing this change, I was
> > not
> > > > >> aware
> > > > >>>>> of
> > > > >>>>>>>>>>>>>> KIP-159
> > > > >>>>>>>>>>>>>>>> or
> > > > >>>>>>>>>>>>>>>>> KAFKA-5632.
> > > > >>>>>>>>>>>>>>>>> If KIP-159 is adopted and we reduce this KIP to add
> > > > >> Headers
> > > > >>> to
> > > > >>>>>>>>>>>>>>>>> RecordContext will be enough, but I'm not sure
> about
> > > the
> > > > >>> scope
> > > > >>>>>>>>>>> of
> > > > >>>>>>>>>>>>>>>> KIP-159.
> > > > >>>>>>>>>>>>>>>>> If it includes stateful operations will be
> difficult
> > to
> > > > >>>>>>>>>>>> implemented
> > > > >>>>>>>>>>>>>> as
> > > > >>>>>>>>>>>>>>>>> stated in 2.
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> Cheers,
> > > > >>>>>>>>>>>>>>>>> Jorge.
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> El mar., 26 dic. 2017 a las 20:04, Matthias J. Sax
> (<
> > > > >>>>>>>>>>>>>>>> matthias@confluent.io>)
> > > > >>>>>>>>>>>>>>>>> escribió:
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> Thanks for the KIP Jorge,
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> As Bill pointed out already, we should be careful
> > with
> > > > >>> adding
> > > > >>>>>>>>>>>> new
> > > > >>>>>>>>>>>>>>>>>> overloads as this contradicts the work done via
> > > KIP-182.
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> This KIP also seems to be related to KIP-149 and
> > > > KIP-159.
> > > > >>> Are
> > > > >>>>>>>>>>>> you
> > > > >>>>>>>>>>>>>>> aware
> > > > >>>>>>>>>>>>>>>>>> of them? Both have quite long DISCUSS threads, but
> > it
> > > > >> might
> > > > >>>>> be
> > > > >>>>>>>>>>>>> worth
> > > > >>>>>>>>>>>>>>>>>> browsing through them.
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> A few further questions:
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>  - why do you want to add the headers to
> > `KeyValue`? I
> > > > am
> > > > >>> not
> > > > >>>>>>>>>>>> sure
> > > > >>>>>>>>>>>>>> if
> > > > >>>>>>>>>>>>>>> we
> > > > >>>>>>>>>>>>>>>>>> should consider headers as optional metadata and
> add
> > > it
> > > > >> to
> > > > >>>>>>>>>>>>>>>>>> `RecordContext` similar to timestamp, offset, etc.
> > > only
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>  - You only include stateless single-record
> > > > >> transformations
> > > > >>>>> at
> > > > >>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>> DSL
> > > > >>>>>>>>>>>>>>>>>> level. Do you suggest that all other operator just
> > > drop
> > > > >>>>>>>>>>> headers
> > > > >>>>>>>>>>>> on
> > > > >>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>> floor?
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>  - Why do you only want to put headers into
> > in-memory
> > > > and
> > > > >>>>>>>>>>> cache
> > > > >>>>>>>>>>>>> but
> > > > >>>>>>>>>>>>>>> not
> > > > >>>>>>>>>>>>>>>>>> RocksDB store? What do you mean by "pass through"?
> > > IMHO,
> > > > >>> all
> > > > >>>>>>>>>>>>> stores
> > > > >>>>>>>>>>>>>>>>>> should behave the same at DSL level.
> > > > >>>>>>>>>>>>>>>>>>    -> if we store the headers in the state stores,
> > > what
> > > > >> is
> > > > >>>>> the
> > > > >>>>>>>>>>>>>> upgrade
> > > > >>>>>>>>>>>>>>>>>> path?
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>  - Why do we need to store record header in state
> in
> > > the
> > > > >>>>> first
> > > > >>>>>>>>>>>>>> place,
> > > > >>>>>>>>>>>>>>> if
> > > > >>>>>>>>>>>>>>>>>> we exclude stateful operator at DSL level?
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> What is the motivation for the "border lines" you
> > > > choose?
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> -Matthias
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> On 12/21/17 8:18 AM, Bill Bejeck wrote:
> > > > >>>>>>>>>>>>>>>>>>> Jorge,
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> Thanks for the KIP, I know this is a feature
> others
> > > in
> > > > >> the
> > > > >>>>>>>>>>>>>> community
> > > > >>>>>>>>>>>>>>>> have
> > > > >>>>>>>>>>>>>>>>>>> been interested in getting into Kafka Streams.
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> I took a quick pass over it, and I have one
> initial
> > > > >>>>> question.
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> We recently reduced overloads with KIP-182, and
> in
> > > this
> > > > >>> KIP
> > > > >>>>>>>>>>> we
> > > > >>>>>>>>>>>>> are
> > > > >>>>>>>>>>>>>>>>>>> increasing them again.
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> I can see from the KIP why they are necessary,
> but
> > > I'm
> > > > >>>>>>>>>>>> wondering
> > > > >>>>>>>>>>>>> if
> > > > >>>>>>>>>>>>>>>> there
> > > > >>>>>>>>>>>>>>>>>>> is something else we can do to cut down on the
> > > > overloads
> > > > >>>>>>>>>>>>>>> introduced.  I
> > > > >>>>>>>>>>>>>>>>>>> don't have any sound suggestions ATM, so I'll
> have
> > to
> > > > >>> think
> > > > >>>>>>>>>>>> about
> > > > >>>>>>>>>>>>>> it
> > > > >>>>>>>>>>>>>>>> some
> > > > >>>>>>>>>>>>>>>>>>> more, but I wanted to put the thought out there.
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> Thanks,
> > > > >>>>>>>>>>>>>>>>>>> Bill
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> On Thu, Dec 21, 2017 at 9:06 AM, Jorge Esteban
> > > Quilcate
> > > > >>>>>>>>>>> Otoya <
> > > > >>>>>>>>>>>>>>>>>>> quilcate.jorge@gmail.com> wrote:
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>> Hi all,
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>> I have created a KIP to add Record Headers
> support
> > > to
> > > > >>> Kafka
> > > > >>>>>>>>>>>>>> Streams
> > > > >>>>>>>>>>>>>>>> API:
> > > > >>>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/
> > > > confluence/display/KAFKA/KIP-
> > > > >>>>>>>>>>>>>>>>>>>>
> 244%3A+Add+Record+Header+support+to+Kafka+Streams
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>> The main goal is to be able to use headers to
> > > filter,
> > > > >> map
> > > > >>>>>>>>>>> and
> > > > >>>>>>>>>>>>>>> process
> > > > >>>>>>>>>>>>>>>>>>>> records as streams. Stateful processing (joins,
> > > > >> windows)
> > > > >>>>> are
> > > > >>>>>>>>>>>> not
> > > > >>>>>>>>>>>>>>>>>>>> considered.
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>> Proposed changes/Draft:
> > > > >>>>>>>>>>>>>>>>>>>>
> > > https://github.com/apache/kafka/compare/trunk...jeqo:
> > > > >>>>>>>>>>>>>>> streams-headers
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>> Feedback and suggestions are more than welcome.
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>> Cheers,
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>> Jorge.
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> --
> > > > >>>>>>>>>>>>> -- Guozhang
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>>
> > > > >>>
> > > > >>>
> > > > >>
> > > > >
> > > > >
> > > > >
> > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message