kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: KIP-244: Add Record Header support to Kafka Streams
Date Mon, 07 May 2018 03:53:09 GMT
Matthias, thanks for sharing your opinions in the inheritance protocol of
the record context. I'm thinking maybe we should make this discussion as a
separate KIP by itself? If yes, then KIP-244's scope would be smaller, and
within KIP-244 we can have a simple inheritance rule that setting it to
null when 1) going through stateful operators and 2) sending to any topics.


Guozhang

On Sun, May 6, 2018 at 10:24 AM, Matthias J. Sax <matthias@confluent.io>
wrote:

> Making the inheritance protocol a public contract seems reasonable to me.
>
> In the current implementation, all output records inherits the offset,
> timestamp, topic, and partition metadata from the input record. We
> already added an API to change the timestamp explicitly for the output
> record thought.
>
> I think it make sense to keep the inheritance of offset, topic, and
> partition. For headers, it's worth to discuss. I see arguments for two
> strategies: (1) inherit by default, (2) set `null` by default.
> Independent of the default behavior, we should add an API to set headers
> for output records explicitly though (similar to the "set timestamp API").
>
> From my point of view, timestamp/headers are a different
> "class/category" of data/metadata than topic/partition/offset. For the
> first category, it makes sense to manipulate them and it's more than
> "plain metadata"; especially the timestamp. For the second category it
> does not make sense to manipulate it, and to me topic/partition/offset
> is pure metadata only---strictly speaking, it's even questionable if
> output records should have any value for topic/partition/offset in the
> first place, or if they should be `null`, because those attributes do
> only make sense for source records that are consumed from a topic
> directly only. On the other hand, if we make this difference explicit,
> it might be useful information for the use to track the current
> topic/partition/offset of the original source record.
>
> Furthermore, to me, timestamps and headers are somewhat different, too.
> For stream processing it's required that every record has a timestamp;
> thus, it make sense to inherit the input record timestamp by default (a
> timestamp is not really metadata but actually equally important to key
> and value from my point of view). Header however are optional, and thus
> inheriting them is not really required. It might be convenient though:
> for example, imagine a simple "filter-only" application -- it would be
> cumbersome for users to explicitly copy the headers from the input
> records to the output records -- it seems to be unnecessary boilerplate
> code. On the other hand, for any other more complex use case, it's
> questionable to inherit headers---note, that headers would be written to
> the output topics increasing the size of the messages. Overall, I am not
> sure which default strategy might be the better one for headers. Is
> there a convincing argument for either one of them? I slightly tend to
> think that using `null` as default might be better.
>
> Last, we could also make the default behavior configurable. Something
> like `inherit.record.headers=true/false` with default value "false".
> This would allow people to opt-in for auto-header-inheritance. Just an
> idea I wanted to add to the discussion---not sure if it's a good one.
>
>
> -Matthias
>
> On 5/4/18 3:13 PM, Guozhang Wang wrote:
> > Hello Jorge,
> >
> >> Agree. Probably point 3 handles this. `Headers` been part of
> `RecordContext`
> > would be handled the same way as other attributes.
> >
> > Today we do not have a clear inheritance protocol for other fields of
> > RecordContext yet: although internally we do have some criterion on
> > topic/partition/offset and timestamp, they are not explicitly exposed to
> > users.
> >
> >
> > I think we still need to have a defined protocol for headers itself, but
> I
> > agree that it better to be scoped out side of this KIP, since this
> > inheritance protocol itself for all the fields of RecordContext would
> > better be a separate KIP. We can document this clearly in the wiki page.
> >
> > Guozhang
> >
> >
> > On Fri, May 4, 2018 at 5:26 AM, Florian Garcia <
> > garcia.florian.perso@gmail.com> wrote:
> >
> >> Hi,
> >>
> >> For me this is a great first step to have Headers in streaming.
> >> My current use case is about distributed tracing (Zipkin) and with the
> >> headers in the processorContext() I'll be able to manage that for the
> most
> >> cases.
> >> The KIP-159 should follow after this but this is where all the major
> >> questions will arise for stateful operations (as Guozhang said).
> >>
> >> Thanks for the work on this Jorge.
> >>
> >> Le ven. 4 mai 2018 à 01:04, Jorge Esteban Quilcate Otoya <
> >> quilcate.jorge@gmail.com> a écrit :
> >>
> >>> Thanks Guozhang and John for your feedback.
> >>>
> >>>> 1. We need to have a clear inheritance protocol of headers in our
> >>> topology:
> >>>> 1.a. In PAPI's context.forward() call, it should be straight-forward.
> >>>> 1.b. In DSL stateless operators, it should be straight-forward.
> >>>> 1.c. What about in stateful operators like aggregates and joins?
> >>>
> >>> Agree. Probably point 3 handles this. `Headers` been part of
> >>> `RecordContext` would be handled the same way as other attributes.
> >>>
> >>>> 3. In future work "Adding DSL Processors to use Headers to
> >>> filter/map/branch",
> >>> it may well be covered in KIP-159; worth taking a look at that KIP.
> >>>
> >>> Yes, I will point to it.
> >>>
> >>>> 2. In terms of internal implementations, should the state store
> >>> cache include the headers then in order to be sent downstreams?
> >>>
> >>> Good question. As `LRUCacheEntry` extends `RecordContext`, I thinks
> this
> >> is
> >>> already supported. I will detail this on the KIP.
> >>>
> >>>> 4. MINOR: "void process(K key, V value, Headers headers)", this should
> >> be
> >>> removed?
> >>>
> >>> Fixed, thanks.
> >>>
> >>>> 5. MINOR: it seems to be the case that in this KIP, our scope is only
> >>> for exposing
> >>> the headers for reading, and not allowing users to add / modify
> headers,
> >>> right? If yes, we'd better state it clearly at the "Proposed Changes"
> >>> section.
> >>>
> >>> As headers is exposed in the `ProcessContext`, and headers will be send
> >>> downstream, it can be mutated (add/remove headers).
> >>>
> >>>  > Also, despite the decreased scope in this KIP, I think it might be
> >>> valuable to define what will happen to headers once this change is
> >>> implemented. For example, I think a minimal groundwork-level change
> might
> >>> be to make the API changes, while promising to drop all headers from
> >> input
> >>> records.
> >>>
> >>> I will suggest to pass headers to downstream nodes, and don't drop
> yhrm.
> >>> Clients will have to drop `Headers` if they have used them.
> >>> Or it could be something like a boolean config property that manage
> this.
> >>> I would like to hear feedback here.
> >>>
> >>>> A maximal groundwork change would be to forward the headers through
> all
> >>> operators
> >>> in
> >>>
> >>> Streams. But I think there are some unresolved questions about
> >> forwarding,
> >>> like "what happens to the headers in a join?"
> >>> Probably this would be solve once KIP-159 is implemented and supporting
> >>> Headers.
> >>>
> >>>> There's of course some middle ground, but instinctively, I think I'd
> >>> prefer to have a clear definition that headers are currently *not*
> >>> forwarded, rather than having a complex list of operators that do or
> >> don't
> >>> forward them. Plus, I think it might be tricky to define this behavior
> >>> while not allowing the scope to return to that of your original
> proposal!
> >>>
> >>> Agree. But `Headers` were forwarded *explicitly* in the original
> >> proposal.
> >>> The current one pass it as part of `RecordContext`, so if it's forward
> it
> >>> or not is as the same as `RecordContext`.
> >>> On top of this implementation, we can design how filter/map/join will
> be
> >>> handled. Probably following KIP-159 approach.
> >>>
> >>> Cheers,
> >>> Jorge.
> >>>
> >>> El mié., 2 may. 2018 a las 22:56, Guozhang Wang (<wangguoz@gmail.com>)
> >>> escribió:
> >>>
> >>>> Hi Jorge,
> >>>>
> >>>> Thanks for the written KIP! Made a pass over it and left some comments
> >>>> (some of them overlapped with John's):
> >>>>
> >>>> 1. We need to have a clear inheritance protocol of headers in our
> >>> topology:
> >>>>
> >>>> 1.a. In PAPI's context.forward() call, it should be straight-forward.
> >>>> 1.b. In DSL stateless operators, it should be straight-forward.
> >>>> 1.c. What about in stateful operators like aggregates and joins?
> >>>>
> >>>> 2. In terms of internal implementations, should the state store cache
> >>>> include the headers then in order to be sent downstreams?
> >>>>
> >>>> 3. In future work "Adding DSL Processors to use Headers to
> >>>> filter/map/branch", it may well be covered in KIP-159; worth taking
a
> >>> look
> >>>> at that KIP.
> >>>>
> >>>> 4. MINOR: "void process(K key, V value, Headers headers)", this should
> >> be
> >>>> removed?
> >>>>
> >>>> 5. MINOR: it seems to be the case that in this KIP, our scope is only
> >> for
> >>>> exposing the headers for reading, and not allowing users to add /
> >> modify
> >>>> headers, right? If yes, we'd better state it clearly at the "Proposed
> >>>> Changes" section.
> >>>>
> >>>>
> >>>> Guozhang
> >>>>
> >>>>
> >>>> On Wed, May 2, 2018 at 8:42 AM, John Roesler <john@confluent.io>
> >> wrote:
> >>>>
> >>>>> Hi Jorge,
> >>>>>
> >>>>> Thanks for the design work.
> >>>>>
> >>>>> I agree that de-scoping the work to just the Processor API will
help
> >>>>> contain the design and implementation complexity.
> >>>>>
> >>>>> In the KIP, it mentions that the headers would be available in the
> >>>>> ProcessorContext, (like "context.headers()"). It also says that
> >>>>> implementers would need to implement the method "void process(K
key,
> >> V
> >>>>> value, Headers headers);". I think maybe you meant to remove the
> >>> proposal
> >>>>> to modify "process", since it wouldn't be necessary in conjunction
> >> with
> >>>> the
> >>>>> ProcessorContext change, and it's not represented in your PR.
> >>>>>
> >>>>> Also, despite the decreased scope in this KIP, I think it might
be
> >>>> valuable
> >>>>> to define what will happen to headers once this change is
> >> implemented.
> >>>> For
> >>>>> example, I think a minimal groundwork-level change might be to make
> >> the
> >>>> API
> >>>>> changes, while promising to drop all headers from input records.
> >>>>>
> >>>>> A maximal groundwork change would be to forward the headers through
> >> all
> >>>>> operators in Streams. But I think there are some unresolved questions
> >>>> about
> >>>>> forwarding, like "what happens to the headers in a join?"
> >>>>>
> >>>>> There's of course some middle ground, but instinctively, I think
I'd
> >>>> prefer
> >>>>> to have a clear definition that headers are currently *not*
> >> forwarded,
> >>>>> rather than having a complex list of operators that do or don't
> >> forward
> >>>>> them. Plus, I think it might be tricky to define this behavior while
> >>> not
> >>>>> allowing the scope to return to that of your original proposal!
> >>>>>
> >>>>> Thanks again for the KIP,
> >>>>> -John
> >>>>>
> >>>>> On Wed, May 2, 2018 at 8:05 AM, Jorge Esteban Quilcate Otoya <
> >>>>> quilcate.jorge@gmail.com> wrote:
> >>>>>
> >>>>>> Hi Matthias,
> >>>>>>
> >>>>>> I've created a new JIRA to track this, updated the KIP and create
a
> >>> PR.
> >>>>>>
> >>>>>> Looking forward to your feedback,
> >>>>>>
> >>>>>> Jorge.
> >>>>>>
> >>>>>> El mar., 13 feb. 2018 a las 22:43, Matthias J. Sax (<
> >>>>> matthias@confluent.io
> >>>>>>> )
> >>>>>> escribió:
> >>>>>>
> >>>>>>> Hi Jorge,
> >>>>>>>
> >>>>>>> I would like to unblock this KIP to make some progress.
The
> >> tricky
> >>>>>>> question of this work, seems to be how to expose headers
at DSL
> >>>> level.
> >>>>>>> This related to KIP-149 and KIP-159. However, for Processor
API,
> >> it
> >>>>>>> seems to be rather straight forward to add headers to the
API.
> >>>>>>>
> >>>>>>> Thus, I would suggest to de-scope this KIP and add header
support
> >>> for
> >>>>>>> Processor API only as a first step. If this is done, we
can see
> >> in
> >>> a
> >>>>>>> second step, how to add headers at DSL level.
> >>>>>>>
> >>>>>>> WDYT about this proposal?
> >>>>>>>
> >>>>>>> If you agree, please update the JIRA and KIP accordingly.
Note,
> >>> that
> >>>> we
> >>>>>>> have two JIRA that are duplicates atm. We can scope them
> >>> accordingly:
> >>>>>>> one for PAPI only, and second as a dependent JIRA for DSL.
> >>>>>>>
> >>>>>>>
> >>>>>>> -Matthias
> >>>>>>>
> >>>>>>> On 12/30/17 3:11 PM, Jorge Esteban Quilcate Otoya wrote:
> >>>>>>>> Thanks for your feedback!
> >>>>>>>>
> >>>>>>>> 1. I was adding headers to KeyValue to support groupBy,
but I
> >>> think
> >>>>> it
> >>>>>> is
> >>>>>>>> not necessary. It should be enough with mapping headers
to
> >>>> key/value
> >>>>>> and
> >>>>>>>> then group using current KeyValue structure.
> >>>>>>>>
> >>>>>>>> 2. Yes. IMO key/value stores, like RocksDB, rely on
KV as
> >>>> structure,
> >>>>>>> hence
> >>>>>>>> considering headers as part of stateful operations will
not fit
> >>> in
> >>>>> this
> >>>>>>>> approach and increase complexity (I cannot think in
a use-case
> >>> that
> >>>>>> need
> >>>>>>>> this).
> >>>>>>>>
> >>>>>>>> 3. and 4. Changes on 1. will solve this issue.
> >>>>>>>>
> >>>>>>>> Probably I rush a bit proposing this change, I was not
aware of
> >>>>> KIP-159
> >>>>>>> or
> >>>>>>>> KAFKA-5632.
> >>>>>>>> If KIP-159 is adopted and we reduce this KIP to add
Headers to
> >>>>>>>> RecordContext will be enough, but I'm not sure about
the scope
> >> of
> >>>>>>> KIP-159.
> >>>>>>>> If it includes stateful operations will be difficult
to
> >>> implemented
> >>>>> as
> >>>>>>>> stated in 2.
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>> Jorge.
> >>>>>>>>
> >>>>>>>> El mar., 26 dic. 2017 a las 20:04, Matthias J. Sax (<
> >>>>>>> matthias@confluent.io>)
> >>>>>>>> escribió:
> >>>>>>>>
> >>>>>>>>> Thanks for the KIP Jorge,
> >>>>>>>>>
> >>>>>>>>> As Bill pointed out already, we should be careful
with adding
> >>> new
> >>>>>>>>> overloads as this contradicts the work done via
KIP-182.
> >>>>>>>>>
> >>>>>>>>> This KIP also seems to be related to KIP-149 and
KIP-159. Are
> >>> you
> >>>>>> aware
> >>>>>>>>> of them? Both have quite long DISCUSS threads, but
it might be
> >>>> worth
> >>>>>>>>> browsing through them.
> >>>>>>>>>
> >>>>>>>>> A few further questions:
> >>>>>>>>>
> >>>>>>>>>  - why do you want to add the headers to `KeyValue`?
I am not
> >>> sure
> >>>>> if
> >>>>>> we
> >>>>>>>>> should consider headers as optional metadata and
add it to
> >>>>>>>>> `RecordContext` similar to timestamp, offset, etc.
only
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>>  - You only include stateless single-record transformations
at
> >>> the
> >>>>> DSL
> >>>>>>>>> level. Do you suggest that all other operator just
drop
> >> headers
> >>> on
> >>>>> the
> >>>>>>>>> floor?
> >>>>>>>>>
> >>>>>>>>>  - Why do you only want to put headers into in-memory
and
> >> cache
> >>>> but
> >>>>>> not
> >>>>>>>>> RocksDB store? What do you mean by "pass through"?
IMHO, all
> >>>> stores
> >>>>>>>>> should behave the same at DSL level.
> >>>>>>>>>    -> if we store the headers in the state stores,
what is the
> >>>>> upgrade
> >>>>>>>>> path?
> >>>>>>>>>
> >>>>>>>>>  - Why do we need to store record header in state
in the first
> >>>>> place,
> >>>>>> if
> >>>>>>>>> we exclude stateful operator at DSL level?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> What is the motivation for the "border lines" you
choose?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> -Matthias
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 12/21/17 8:18 AM, Bill Bejeck wrote:
> >>>>>>>>>> Jorge,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for the KIP, I know this is a feature
others in the
> >>>>> community
> >>>>>>> have
> >>>>>>>>>> been interested in getting into Kafka Streams.
> >>>>>>>>>>
> >>>>>>>>>> I took a quick pass over it, and I have one
initial question.
> >>>>>>>>>>
> >>>>>>>>>> We recently reduced overloads with KIP-182,
and in this KIP
> >> we
> >>>> are
> >>>>>>>>>> increasing them again.
> >>>>>>>>>>
> >>>>>>>>>> I can see from the KIP why they are necessary,
but I'm
> >>> wondering
> >>>> if
> >>>>>>> there
> >>>>>>>>>> is something else we can do to cut down on the
overloads
> >>>>>> introduced.  I
> >>>>>>>>>> don't have any sound suggestions ATM, so I'll
have to think
> >>> about
> >>>>> it
> >>>>>>> some
> >>>>>>>>>> more, but I wanted to put the thought out there.
> >>>>>>>>>>
> >>>>>>>>>> Thanks,
> >>>>>>>>>> Bill
> >>>>>>>>>>
> >>>>>>>>>> On Thu, Dec 21, 2017 at 9:06 AM, Jorge Esteban
Quilcate
> >> Otoya <
> >>>>>>>>>> quilcate.jorge@gmail.com> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi all,
> >>>>>>>>>>>
> >>>>>>>>>>> I have created a KIP to add Record Headers
support to Kafka
> >>>>> Streams
> >>>>>>> API:
> >>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>>>>>>> 244%3A+Add+Record+Header+support+to+Kafka+Streams
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> The main goal is to be able to use headers
to filter, map
> >> and
> >>>>>> process
> >>>>>>>>>>> records as streams. Stateful processing
(joins, windows) are
> >>> not
> >>>>>>>>>>> considered.
> >>>>>>>>>>>
> >>>>>>>>>>> Proposed changes/Draft:
> >>>>>>>>>>> https://github.com/apache/kafka/compare/trunk...jeqo:
> >>>>>> streams-headers
> >>>>>>>>>>>
> >>>>>>>>>>> Feedback and suggestions are more than welcome.
> >>>>>>>>>>>
> >>>>>>>>>>> Cheers,
> >>>>>>>>>>>
> >>>>>>>>>>> Jorge.
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> -- Guozhang
> >>>>
> >>>
> >>
> >
> >
> >
>
>


-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message