druid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gian Merlino <g...@apache.org>
Subject Re: [Proposal] - Kafka Input Format for headers, key and payload parsing
Date Wed, 22 Sep 2021 00:11:37 GMT
Hey Lokesh,

Thanks for the details. To me it makes more sense to have the user specify
the entire timestamp and key field name (it seems weird to have a
"timestamp prefix" and "key prefix" that are only used for single fields).
I just wrote that + a few comments on the PR itself:
https://github.com/apache/druid/pull/11630#pullrequestreview-760351816

On Fri, Sep 17, 2021 at 9:43 AM Lokesh Lingarajan <llingarajan@confluent.io>
wrote:

> Hi Gian,
>
> Thanks for the your reply, please find below are my comments
>
> 1) How is the timestamp exposed exactly? I see there is a
> recordTimestampLabelPrefix, but what is that a prefix to? Also: what do you
> think about accepting the entire name of the timestamp field instead?
> Finally: in the docs it would be good to have an example of how people can
> write a timestampSpec that refers to the Kafka timestamp, and also how they
> can load the Kafka timestamp as a long-typed dimension storing millis since
> the epoch (our convention for secondary timestamps).
>
> >>> The input format allows users to pick and choose the timestamp value
> either from the header/key/value portions of the kafka record. If the
> timestamp is missing in both key and value parts, then users can always
> default to the timestamp that is available in the header. Code will default
> this column with the name "kafka.timestamp". recordTimestampLabelPrefix allows
> users to change the "kafka" to something else. If this model is deviating
> from what we currently have in druid, then I agree we should change this to
> giving a full name.  Currently timestamp is loaded directly from
> ConsumerRecord<K, V> data structure as follows
>
> // Add kafka record timestamp to the mergelist, we will skip record timestamp if the
same key exists already in the header list
> mergeMap.putIfAbsent(recordTimestampColumn, record.getRecord().timestamp());
>
>
> 2) You mention that the key will show up as "kafka.key", and in the
> example you provide I don't see a parameter enabling a choice of what that
> field is called. Is it hard-coded or is it configurable somehow?
>
> >>> this behavior is exactly the same as the timestamp discussed above. If
> nothing is done, we will have a column named "kafka.key", users have the
> choice to change the "kafka" to something else. We can make the change
> uniform here as well based on the above decision.
>
> 3) Could you write up some user-facing docs too, like an addition to
> development/extensions-core/kafka-ingestion.md? That way, people will know
> how to use this feature. And it'll help us better understand how it's
> supposed to work. (Perhaps it could have answered the two questions above)
>
> >>> Absolutely agree with you, I will do that along with other review
> comments from the code.
>
> Thanks again for looking into this :)
>
> -Lokesh
>
>
> On Thu, Sep 16, 2021 at 9:34 AM Gian Merlino <gian@apache.org> wrote:
>
>> Lokesh, it looks like you got dropped from the thread, so I'm adding you
>> back. Please check out the previous message for some comments.
>>
>> By the way, by default, replies to the dev list go back to the dev list
>> only, which can cause you to miss some replies. If you join the list you
>> will be sure to get all your replies 🙂
>>
>> On Tue, Sep 14, 2021 at 10:10 PM Gian Merlino <gian@apache.org> wrote:
>>
>>> Hey Lokesh,
>>>
>>> The concept and API looks solid to me! Thank you for writing this up. I
>>> agree with Ben's comment. This will be really useful functionality.
>>>
>>> I have a few questions about how it would work:
>>>
>>> 1) How is the timestamp exposed exactly? I see there is a
>>> recordTimestampLabelPrefix, but what is that a prefix to? Also: what do you
>>> think about accepting the entire name of the timestamp field instead?
>>> Finally: in the docs it would be good to have an example of how people can
>>> write a timestampSpec that refers to the Kafka timestamp, and also how they
>>> can load the Kafka timestamp as a long-typed dimension storing millis since
>>> the epoch (our convention for secondary timestamps).
>>>
>>> 2) You mention that the key will show up as "kafka.key", and in the
>>> example you provide I don't see a parameter enabling a choice of what that
>>> field is called. Is it hard-coded or is it configurable somehow?
>>>
>>> 3) Could you write up some user-facing docs too, like an addition to
>>> development/extensions-core/kafka-ingestion.md? That way, people will know
>>> how to use this feature. And it'll help us better understand how it's
>>> supposed to work. (Perhaps it could have answered the two questions above)
>>>
>>> Full disclosure: I haven't reviewed the patch yet; these questions are
>>> just based on your writeup.
>>>
>>> On Mon, Aug 30, 2021 at 3:00 PM Lokesh Lingarajan
>>> <llingarajan@confluent.io.invalid> wrote:
>>>
>>>> Motivation
>>>>
>>>> Today we ingest a number of high cardinality metrics into Druid across
>>>> dimensions. These metrics are rolled up on a per minute basis, and are
>>>> very
>>>> useful when looking at metrics on a partition or client basis. Events is
>>>> another class of data that provides useful information about a
>>>> particular
>>>> incident/scenario inside a Kafka cluster. Events themselves are carried
>>>> inside the kafka payload, but nonetheless there is some very useful
>>>> metadata that is carried in kafka headers that can serve as a useful
>>>> dimension for aggregation and in turn bringing better insights.
>>>>
>>>> PR(#10730 <https://github.com/apache/druid/pull/10730>) introduced
>>>> support
>>>> for Kafka headers in InputFormats.
>>>>
>>>> We still need an input format to parse out the headers and translate
>>>> those
>>>> into relevant columns in Druid. Until that’s implemented, none of the
>>>> information available in the Kafka message headers would be exposed. So
>>>> first there is a need to implement an input format that can parse
>>>> headers
>>>> in any given format(provided we support the format) like we parse
>>>> payloads
>>>> today. Apart from headers there is also some useful information present
>>>> in
>>>> the key portion of the kafka record. We also need a way to expose the
>>>> data
>>>> present in the key as druid columns. We need a generic way to express at
>>>> configuration time what attributes from headers, key and payload need
>>>> to be
>>>> ingested into druid. We need to keep the design generic enough so that
>>>> users can specify different parsers for headers, key and payload.
>>>>
>>>> Proposal is to design an input format to solve the above by providing
>>>> wrapper around any existing input formats and merging the data into a
>>>> single unified Druid row.
>>>> Proposed changes
>>>>
>>>> Let's look at a sample input format from the above discussion
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *"inputFormat":{        "type": "kafka", // New input format type
>>>> "headerLabelPrefix": "kafka.header.", // Label prefix for header
>>>> columns,
>>>> this will avoid collisions while merging columns
>>>> "recordTimestampLabelPrefix": "kafka.", // Kafka record's timestamp is
>>>> made
>>>> available in case payload does not carry timestamp
>>>> "headerFormat":
>>>> // Header parser specifying that values are of type string        {
>>>>       "type": "string"        },       "valueFormat": // Value parser
>>>> from
>>>> json parsing       {             "type": "json",
>>>>  "flattenSpec":
>>>> {                     "useFieldDiscovery": true,
>>>> "fields": [...]             }        },        "keyFormat": // Key
>>>> parser
>>>> also from json parsing         {             "type": "json"         }}*
>>>>
>>>> Since we have independent sections for header, key and payload, it will
>>>> also enable parsing each section with its own parser, eg., headers
>>>> coming
>>>> in as string and payload as json.
>>>>
>>>> KafkaInputFormat(the new inputFormat class) will be the uber class
>>>> extending inputFormat interface and will be responsible for creating
>>>> individual parsers for header, key and payload, blend the data resolving
>>>> conflicts in columns and generating a single unified InputRow for Druid
>>>> ingestion.
>>>>
>>>> "headerFormat" will allow users to plug in a parser type for the header
>>>> values and will add the default header prefix as "kafka.header."(can be
>>>> overridden) for attributes to avoid collision while merging attributes
>>>> with
>>>> payload.
>>>>
>>>> Kafka payload parser will be responsible for parsing the Value portion
>>>> of
>>>> the Kafka record. This is where most of the data will come from and we
>>>> should be able to plugin existing parsers. One thing to note here is
>>>> that
>>>> if batching is performed, then the code should be augmenting header and
>>>> key
>>>> values to every record in the batch.
>>>>
>>>> Kafka key parser will handle parsing the Key portion of the Kafka record
>>>> and will ingest the Key with dimension name as "kafka.key".
>>>> Operational impact, Test plan & Future work
>>>>
>>>> Since we had an immediate need to ingest blended data from header and
>>>> payload, we have implemented the above proposal in a PR - here
>>>> <https://github.com/apache/druid/pull/11630>
>>>> -Lokesh Lingarajan
>>>>
>>>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message