kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: KIP-244: Add Record Header support to Kafka Streams
Date Mon, 07 May 2018 04:03:54 GMT
I agree, that we should not block this KIP if possible. Nevertheless, we
should try to get a reasonable default strategy for inheriting the
headers so we don't need to change it later on.

Let's see what other think. I still tend slightly to set to `null` by
default for all cases. If the default strategy is different for
different operators as you suggest, it might be confusion to users.
IMHO, the default behavior should be as simple as possible.


-Matthias


On 5/6/18 8:53 PM, Guozhang Wang wrote:
> Matthias, thanks for sharing your opinions in the inheritance protocol of
> the record context. I'm thinking maybe we should make this discussion as a
> separate KIP by itself? If yes, then KIP-244's scope would be smaller, and
> within KIP-244 we can have a simple inheritance rule that setting it to
> null when 1) going through stateful operators and 2) sending to any topics.
> 
> 
> Guozhang
> 
> On Sun, May 6, 2018 at 10:24 AM, Matthias J. Sax <matthias@confluent.io>
> wrote:
> 
>> Making the inheritance protocol a public contract seems reasonable to me.
>>
>> In the current implementation, all output records inherits the offset,
>> timestamp, topic, and partition metadata from the input record. We
>> already added an API to change the timestamp explicitly for the output
>> record thought.
>>
>> I think it make sense to keep the inheritance of offset, topic, and
>> partition. For headers, it's worth to discuss. I see arguments for two
>> strategies: (1) inherit by default, (2) set `null` by default.
>> Independent of the default behavior, we should add an API to set headers
>> for output records explicitly though (similar to the "set timestamp API").
>>
>> From my point of view, timestamp/headers are a different
>> "class/category" of data/metadata than topic/partition/offset. For the
>> first category, it makes sense to manipulate them and it's more than
>> "plain metadata"; especially the timestamp. For the second category it
>> does not make sense to manipulate it, and to me topic/partition/offset
>> is pure metadata only---strictly speaking, it's even questionable if
>> output records should have any value for topic/partition/offset in the
>> first place, or if they should be `null`, because those attributes do
>> only make sense for source records that are consumed from a topic
>> directly only. On the other hand, if we make this difference explicit,
>> it might be useful information for the use to track the current
>> topic/partition/offset of the original source record.
>>
>> Furthermore, to me, timestamps and headers are somewhat different, too.
>> For stream processing it's required that every record has a timestamp;
>> thus, it make sense to inherit the input record timestamp by default (a
>> timestamp is not really metadata but actually equally important to key
>> and value from my point of view). Header however are optional, and thus
>> inheriting them is not really required. It might be convenient though:
>> for example, imagine a simple "filter-only" application -- it would be
>> cumbersome for users to explicitly copy the headers from the input
>> records to the output records -- it seems to be unnecessary boilerplate
>> code. On the other hand, for any other more complex use case, it's
>> questionable to inherit headers---note, that headers would be written to
>> the output topics increasing the size of the messages. Overall, I am not
>> sure which default strategy might be the better one for headers. Is
>> there a convincing argument for either one of them? I slightly tend to
>> think that using `null` as default might be better.
>>
>> Last, we could also make the default behavior configurable. Something
>> like `inherit.record.headers=true/false` with default value "false".
>> This would allow people to opt-in for auto-header-inheritance. Just an
>> idea I wanted to add to the discussion---not sure if it's a good one.
>>
>>
>> -Matthias
>>
>> On 5/4/18 3:13 PM, Guozhang Wang wrote:
>>> Hello Jorge,
>>>
>>>> Agree. Probably point 3 handles this. `Headers` been part of
>> `RecordContext`
>>> would be handled the same way as other attributes.
>>>
>>> Today we do not have a clear inheritance protocol for other fields of
>>> RecordContext yet: although internally we do have some criterion on
>>> topic/partition/offset and timestamp, they are not explicitly exposed to
>>> users.
>>>
>>>
>>> I think we still need to have a defined protocol for headers itself, but
>> I
>>> agree that it better to be scoped out side of this KIP, since this
>>> inheritance protocol itself for all the fields of RecordContext would
>>> better be a separate KIP. We can document this clearly in the wiki page.
>>>
>>> Guozhang
>>>
>>>
>>> On Fri, May 4, 2018 at 5:26 AM, Florian Garcia <
>>> garcia.florian.perso@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> For me this is a great first step to have Headers in streaming.
>>>> My current use case is about distributed tracing (Zipkin) and with the
>>>> headers in the processorContext() I'll be able to manage that for the
>> most
>>>> cases.
>>>> The KIP-159 should follow after this but this is where all the major
>>>> questions will arise for stateful operations (as Guozhang said).
>>>>
>>>> Thanks for the work on this Jorge.
>>>>
>>>> Le ven. 4 mai 2018 à 01:04, Jorge Esteban Quilcate Otoya <
>>>> quilcate.jorge@gmail.com> a écrit :
>>>>
>>>>> Thanks Guozhang and John for your feedback.
>>>>>
>>>>>> 1. We need to have a clear inheritance protocol of headers in our
>>>>> topology:
>>>>>> 1.a. In PAPI's context.forward() call, it should be straight-forward.
>>>>>> 1.b. In DSL stateless operators, it should be straight-forward.
>>>>>> 1.c. What about in stateful operators like aggregates and joins?
>>>>>
>>>>> Agree. Probably point 3 handles this. `Headers` been part of
>>>>> `RecordContext` would be handled the same way as other attributes.
>>>>>
>>>>>> 3. In future work "Adding DSL Processors to use Headers to
>>>>> filter/map/branch",
>>>>> it may well be covered in KIP-159; worth taking a look at that KIP.
>>>>>
>>>>> Yes, I will point to it.
>>>>>
>>>>>> 2. In terms of internal implementations, should the state store
>>>>> cache include the headers then in order to be sent downstreams?
>>>>>
>>>>> Good question. As `LRUCacheEntry` extends `RecordContext`, I thinks
>> this
>>>> is
>>>>> already supported. I will detail this on the KIP.
>>>>>
>>>>>> 4. MINOR: "void process(K key, V value, Headers headers)", this should
>>>> be
>>>>> removed?
>>>>>
>>>>> Fixed, thanks.
>>>>>
>>>>>> 5. MINOR: it seems to be the case that in this KIP, our scope is
only
>>>>> for exposing
>>>>> the headers for reading, and not allowing users to add / modify
>> headers,
>>>>> right? If yes, we'd better state it clearly at the "Proposed Changes"
>>>>> section.
>>>>>
>>>>> As headers is exposed in the `ProcessContext`, and headers will be send
>>>>> downstream, it can be mutated (add/remove headers).
>>>>>
>>>>>  > Also, despite the decreased scope in this KIP, I think it might
be
>>>>> valuable to define what will happen to headers once this change is
>>>>> implemented. For example, I think a minimal groundwork-level change
>> might
>>>>> be to make the API changes, while promising to drop all headers from
>>>> input
>>>>> records.
>>>>>
>>>>> I will suggest to pass headers to downstream nodes, and don't drop
>> yhrm.
>>>>> Clients will have to drop `Headers` if they have used them.
>>>>> Or it could be something like a boolean config property that manage
>> this.
>>>>> I would like to hear feedback here.
>>>>>
>>>>>> A maximal groundwork change would be to forward the headers through
>> all
>>>>> operators
>>>>> in
>>>>>
>>>>> Streams. But I think there are some unresolved questions about
>>>> forwarding,
>>>>> like "what happens to the headers in a join?"
>>>>> Probably this would be solve once KIP-159 is implemented and supporting
>>>>> Headers.
>>>>>
>>>>>> There's of course some middle ground, but instinctively, I think
I'd
>>>>> prefer to have a clear definition that headers are currently *not*
>>>>> forwarded, rather than having a complex list of operators that do or
>>>> don't
>>>>> forward them. Plus, I think it might be tricky to define this behavior
>>>>> while not allowing the scope to return to that of your original
>> proposal!
>>>>>
>>>>> Agree. But `Headers` were forwarded *explicitly* in the original
>>>> proposal.
>>>>> The current one pass it as part of `RecordContext`, so if it's forward
>> it
>>>>> or not is as the same as `RecordContext`.
>>>>> On top of this implementation, we can design how filter/map/join will
>> be
>>>>> handled. Probably following KIP-159 approach.
>>>>>
>>>>> Cheers,
>>>>> Jorge.
>>>>>
>>>>> El mié., 2 may. 2018 a las 22:56, Guozhang Wang (<wangguoz@gmail.com>)
>>>>> escribió:
>>>>>
>>>>>> Hi Jorge,
>>>>>>
>>>>>> Thanks for the written KIP! Made a pass over it and left some comments
>>>>>> (some of them overlapped with John's):
>>>>>>
>>>>>> 1. We need to have a clear inheritance protocol of headers in our
>>>>> topology:
>>>>>>
>>>>>> 1.a. In PAPI's context.forward() call, it should be straight-forward.
>>>>>> 1.b. In DSL stateless operators, it should be straight-forward.
>>>>>> 1.c. What about in stateful operators like aggregates and joins?
>>>>>>
>>>>>> 2. In terms of internal implementations, should the state store cache
>>>>>> include the headers then in order to be sent downstreams?
>>>>>>
>>>>>> 3. In future work "Adding DSL Processors to use Headers to
>>>>>> filter/map/branch", it may well be covered in KIP-159; worth taking
a
>>>>> look
>>>>>> at that KIP.
>>>>>>
>>>>>> 4. MINOR: "void process(K key, V value, Headers headers)", this should
>>>> be
>>>>>> removed?
>>>>>>
>>>>>> 5. MINOR: it seems to be the case that in this KIP, our scope is
only
>>>> for
>>>>>> exposing the headers for reading, and not allowing users to add /
>>>> modify
>>>>>> headers, right? If yes, we'd better state it clearly at the "Proposed
>>>>>> Changes" section.
>>>>>>
>>>>>>
>>>>>> Guozhang
>>>>>>
>>>>>>
>>>>>> On Wed, May 2, 2018 at 8:42 AM, John Roesler <john@confluent.io>
>>>> wrote:
>>>>>>
>>>>>>> Hi Jorge,
>>>>>>>
>>>>>>> Thanks for the design work.
>>>>>>>
>>>>>>> I agree that de-scoping the work to just the Processor API will
help
>>>>>>> contain the design and implementation complexity.
>>>>>>>
>>>>>>> In the KIP, it mentions that the headers would be available in
the
>>>>>>> ProcessorContext, (like "context.headers()"). It also says that
>>>>>>> implementers would need to implement the method "void process(K
key,
>>>> V
>>>>>>> value, Headers headers);". I think maybe you meant to remove
the
>>>>> proposal
>>>>>>> to modify "process", since it wouldn't be necessary in conjunction
>>>> with
>>>>>> the
>>>>>>> ProcessorContext change, and it's not represented in your PR.
>>>>>>>
>>>>>>> Also, despite the decreased scope in this KIP, I think it might
be
>>>>>> valuable
>>>>>>> to define what will happen to headers once this change is
>>>> implemented.
>>>>>> For
>>>>>>> example, I think a minimal groundwork-level change might be to
make
>>>> the
>>>>>> API
>>>>>>> changes, while promising to drop all headers from input records.
>>>>>>>
>>>>>>> A maximal groundwork change would be to forward the headers through
>>>> all
>>>>>>> operators in Streams. But I think there are some unresolved questions
>>>>>> about
>>>>>>> forwarding, like "what happens to the headers in a join?"
>>>>>>>
>>>>>>> There's of course some middle ground, but instinctively, I think
I'd
>>>>>> prefer
>>>>>>> to have a clear definition that headers are currently *not*
>>>> forwarded,
>>>>>>> rather than having a complex list of operators that do or don't
>>>> forward
>>>>>>> them. Plus, I think it might be tricky to define this behavior
while
>>>>> not
>>>>>>> allowing the scope to return to that of your original proposal!
>>>>>>>
>>>>>>> Thanks again for the KIP,
>>>>>>> -John
>>>>>>>
>>>>>>> On Wed, May 2, 2018 at 8:05 AM, Jorge Esteban Quilcate Otoya
<
>>>>>>> quilcate.jorge@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Matthias,
>>>>>>>>
>>>>>>>> I've created a new JIRA to track this, updated the KIP and
create a
>>>>> PR.
>>>>>>>>
>>>>>>>> Looking forward to your feedback,
>>>>>>>>
>>>>>>>> Jorge.
>>>>>>>>
>>>>>>>> El mar., 13 feb. 2018 a las 22:43, Matthias J. Sax (<
>>>>>>> matthias@confluent.io
>>>>>>>>> )
>>>>>>>> escribió:
>>>>>>>>
>>>>>>>>> Hi Jorge,
>>>>>>>>>
>>>>>>>>> I would like to unblock this KIP to make some progress.
The
>>>> tricky
>>>>>>>>> question of this work, seems to be how to expose headers
at DSL
>>>>>> level.
>>>>>>>>> This related to KIP-149 and KIP-159. However, for Processor
API,
>>>> it
>>>>>>>>> seems to be rather straight forward to add headers to
the API.
>>>>>>>>>
>>>>>>>>> Thus, I would suggest to de-scope this KIP and add header
support
>>>>> for
>>>>>>>>> Processor API only as a first step. If this is done,
we can see
>>>> in
>>>>> a
>>>>>>>>> second step, how to add headers at DSL level.
>>>>>>>>>
>>>>>>>>> WDYT about this proposal?
>>>>>>>>>
>>>>>>>>> If you agree, please update the JIRA and KIP accordingly.
Note,
>>>>> that
>>>>>> we
>>>>>>>>> have two JIRA that are duplicates atm. We can scope them
>>>>> accordingly:
>>>>>>>>> one for PAPI only, and second as a dependent JIRA for
DSL.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>> On 12/30/17 3:11 PM, Jorge Esteban Quilcate Otoya wrote:
>>>>>>>>>> Thanks for your feedback!
>>>>>>>>>>
>>>>>>>>>> 1. I was adding headers to KeyValue to support groupBy,
but I
>>>>> think
>>>>>>> it
>>>>>>>> is
>>>>>>>>>> not necessary. It should be enough with mapping headers
to
>>>>>> key/value
>>>>>>>> and
>>>>>>>>>> then group using current KeyValue structure.
>>>>>>>>>>
>>>>>>>>>> 2. Yes. IMO key/value stores, like RocksDB, rely
on KV as
>>>>>> structure,
>>>>>>>>> hence
>>>>>>>>>> considering headers as part of stateful operations
will not fit
>>>>> in
>>>>>>> this
>>>>>>>>>> approach and increase complexity (I cannot think
in a use-case
>>>>> that
>>>>>>>> need
>>>>>>>>>> this).
>>>>>>>>>>
>>>>>>>>>> 3. and 4. Changes on 1. will solve this issue.
>>>>>>>>>>
>>>>>>>>>> Probably I rush a bit proposing this change, I was
not aware of
>>>>>>> KIP-159
>>>>>>>>> or
>>>>>>>>>> KAFKA-5632.
>>>>>>>>>> If KIP-159 is adopted and we reduce this KIP to add
Headers to
>>>>>>>>>> RecordContext will be enough, but I'm not sure about
the scope
>>>> of
>>>>>>>>> KIP-159.
>>>>>>>>>> If it includes stateful operations will be difficult
to
>>>>> implemented
>>>>>>> as
>>>>>>>>>> stated in 2.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Jorge.
>>>>>>>>>>
>>>>>>>>>> El mar., 26 dic. 2017 a las 20:04, Matthias J. Sax
(<
>>>>>>>>> matthias@confluent.io>)
>>>>>>>>>> escribió:
>>>>>>>>>>
>>>>>>>>>>> Thanks for the KIP Jorge,
>>>>>>>>>>>
>>>>>>>>>>> As Bill pointed out already, we should be careful
with adding
>>>>> new
>>>>>>>>>>> overloads as this contradicts the work done via
KIP-182.
>>>>>>>>>>>
>>>>>>>>>>> This KIP also seems to be related to KIP-149
and KIP-159. Are
>>>>> you
>>>>>>>> aware
>>>>>>>>>>> of them? Both have quite long DISCUSS threads,
but it might be
>>>>>> worth
>>>>>>>>>>> browsing through them.
>>>>>>>>>>>
>>>>>>>>>>> A few further questions:
>>>>>>>>>>>
>>>>>>>>>>>  - why do you want to add the headers to `KeyValue`?
I am not
>>>>> sure
>>>>>>> if
>>>>>>>> we
>>>>>>>>>>> should consider headers as optional metadata
and add it to
>>>>>>>>>>> `RecordContext` similar to timestamp, offset,
etc. only
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>  - You only include stateless single-record transformations
at
>>>>> the
>>>>>>> DSL
>>>>>>>>>>> level. Do you suggest that all other operator
just drop
>>>> headers
>>>>> on
>>>>>>> the
>>>>>>>>>>> floor?
>>>>>>>>>>>
>>>>>>>>>>>  - Why do you only want to put headers into in-memory
and
>>>> cache
>>>>>> but
>>>>>>>> not
>>>>>>>>>>> RocksDB store? What do you mean by "pass through"?
IMHO, all
>>>>>> stores
>>>>>>>>>>> should behave the same at DSL level.
>>>>>>>>>>>    -> if we store the headers in the state
stores, what is the
>>>>>>> upgrade
>>>>>>>>>>> path?
>>>>>>>>>>>
>>>>>>>>>>>  - Why do we need to store record header in state
in the first
>>>>>>> place,
>>>>>>>> if
>>>>>>>>>>> we exclude stateful operator at DSL level?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> What is the motivation for the "border lines"
you choose?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> -Matthias
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 12/21/17 8:18 AM, Bill Bejeck wrote:
>>>>>>>>>>>> Jorge,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for the KIP, I know this is a feature
others in the
>>>>>>> community
>>>>>>>>> have
>>>>>>>>>>>> been interested in getting into Kafka Streams.
>>>>>>>>>>>>
>>>>>>>>>>>> I took a quick pass over it, and I have one
initial question.
>>>>>>>>>>>>
>>>>>>>>>>>> We recently reduced overloads with KIP-182,
and in this KIP
>>>> we
>>>>>> are
>>>>>>>>>>>> increasing them again.
>>>>>>>>>>>>
>>>>>>>>>>>> I can see from the KIP why they are necessary,
but I'm
>>>>> wondering
>>>>>> if
>>>>>>>>> there
>>>>>>>>>>>> is something else we can do to cut down on
the overloads
>>>>>>>> introduced.  I
>>>>>>>>>>>> don't have any sound suggestions ATM, so
I'll have to think
>>>>> about
>>>>>>> it
>>>>>>>>> some
>>>>>>>>>>>> more, but I wanted to put the thought out
there.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Bill
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Dec 21, 2017 at 9:06 AM, Jorge Esteban
Quilcate
>>>> Otoya <
>>>>>>>>>>>> quilcate.jorge@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have created a KIP to add Record Headers
support to Kafka
>>>>>>> Streams
>>>>>>>>> API:
>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>>> 244%3A+Add+Record+Header+support+to+Kafka+Streams
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> The main goal is to be able to use headers
to filter, map
>>>> and
>>>>>>>> process
>>>>>>>>>>>>> records as streams. Stateful processing
(joins, windows) are
>>>>> not
>>>>>>>>>>>>> considered.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Proposed changes/Draft:
>>>>>>>>>>>>> https://github.com/apache/kafka/compare/trunk...jeqo:
>>>>>>>> streams-headers
>>>>>>>>>>>>>
>>>>>>>>>>>>> Feedback and suggestions are more than
welcome.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Jorge.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> -- Guozhang
>>>>>>
>>>>>
>>>>
>>>
>>>
>>>
>>
>>
> 
> 


Mime
View raw message