druid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lokesh Lingarajan <llingara...@confluent.io.INVALID>
Subject Re: [Proposal] - Kafka Input Format for headers, key and payload parsing
Date Fri, 17 Sep 2021 16:43:39 GMT
Hi Gian,

Thanks for the your reply, please find below are my comments

1) How is the timestamp exposed exactly? I see there is a
recordTimestampLabelPrefix, but what is that a prefix to? Also: what do you
think about accepting the entire name of the timestamp field instead?
Finally: in the docs it would be good to have an example of how people can
write a timestampSpec that refers to the Kafka timestamp, and also how they
can load the Kafka timestamp as a long-typed dimension storing millis since
the epoch (our convention for secondary timestamps).

>>> The input format allows users to pick and choose the timestamp value
either from the header/key/value portions of the kafka record. If the
timestamp is missing in both key and value parts, then users can always
default to the timestamp that is available in the header. Code will default
this column with the name "kafka.timestamp". recordTimestampLabelPrefix allows
users to change the "kafka" to something else. If this model is deviating
from what we currently have in druid, then I agree we should change this to
giving a full name.  Currently timestamp is loaded directly from
ConsumerRecord<K, V> data structure as follows

// Add kafka record timestamp to the mergelist, we will skip record
timestamp if the same key exists already in the header list
mergeMap.putIfAbsent(recordTimestampColumn, record.getRecord().timestamp());


2) You mention that the key will show up as "kafka.key", and in the example
you provide I don't see a parameter enabling a choice of what that field is
called. Is it hard-coded or is it configurable somehow?

>>> this behavior is exactly the same as the timestamp discussed above. If
nothing is done, we will have a column named "kafka.key", users have the
choice to change the "kafka" to something else. We can make the change
uniform here as well based on the above decision.

3) Could you write up some user-facing docs too, like an addition to
development/extensions-core/kafka-ingestion.md? That way, people will know
how to use this feature. And it'll help us better understand how it's
supposed to work. (Perhaps it could have answered the two questions above)

>>> Absolutely agree with you, I will do that along with other review
comments from the code.

Thanks again for looking into this :)

-Lokesh


On Thu, Sep 16, 2021 at 9:34 AM Gian Merlino <gian@apache.org> wrote:

> Lokesh, it looks like you got dropped from the thread, so I'm adding you
> back. Please check out the previous message for some comments.
>
> By the way, by default, replies to the dev list go back to the dev list
> only, which can cause you to miss some replies. If you join the list you
> will be sure to get all your replies 🙂
>
> On Tue, Sep 14, 2021 at 10:10 PM Gian Merlino <gian@apache.org> wrote:
>
>> Hey Lokesh,
>>
>> The concept and API looks solid to me! Thank you for writing this up. I
>> agree with Ben's comment. This will be really useful functionality.
>>
>> I have a few questions about how it would work:
>>
>> 1) How is the timestamp exposed exactly? I see there is a
>> recordTimestampLabelPrefix, but what is that a prefix to? Also: what do you
>> think about accepting the entire name of the timestamp field instead?
>> Finally: in the docs it would be good to have an example of how people can
>> write a timestampSpec that refers to the Kafka timestamp, and also how they
>> can load the Kafka timestamp as a long-typed dimension storing millis since
>> the epoch (our convention for secondary timestamps).
>>
>> 2) You mention that the key will show up as "kafka.key", and in the
>> example you provide I don't see a parameter enabling a choice of what that
>> field is called. Is it hard-coded or is it configurable somehow?
>>
>> 3) Could you write up some user-facing docs too, like an addition to
>> development/extensions-core/kafka-ingestion.md? That way, people will know
>> how to use this feature. And it'll help us better understand how it's
>> supposed to work. (Perhaps it could have answered the two questions above)
>>
>> Full disclosure: I haven't reviewed the patch yet; these questions are
>> just based on your writeup.
>>
>> On Mon, Aug 30, 2021 at 3:00 PM Lokesh Lingarajan
>> <llingarajan@confluent.io.invalid> wrote:
>>
>>> Motivation
>>>
>>> Today we ingest a number of high cardinality metrics into Druid across
>>> dimensions. These metrics are rolled up on a per minute basis, and are
>>> very
>>> useful when looking at metrics on a partition or client basis. Events is
>>> another class of data that provides useful information about a particular
>>> incident/scenario inside a Kafka cluster. Events themselves are carried
>>> inside the kafka payload, but nonetheless there is some very useful
>>> metadata that is carried in kafka headers that can serve as a useful
>>> dimension for aggregation and in turn bringing better insights.
>>>
>>> PR(#10730 <https://github.com/apache/druid/pull/10730>) introduced
>>> support
>>> for Kafka headers in InputFormats.
>>>
>>> We still need an input format to parse out the headers and translate
>>> those
>>> into relevant columns in Druid. Until that’s implemented, none of the
>>> information available in the Kafka message headers would be exposed. So
>>> first there is a need to implement an input format that can parse headers
>>> in any given format(provided we support the format) like we parse
>>> payloads
>>> today. Apart from headers there is also some useful information present
>>> in
>>> the key portion of the kafka record. We also need a way to expose the
>>> data
>>> present in the key as druid columns. We need a generic way to express at
>>> configuration time what attributes from headers, key and payload need to
>>> be
>>> ingested into druid. We need to keep the design generic enough so that
>>> users can specify different parsers for headers, key and payload.
>>>
>>> Proposal is to design an input format to solve the above by providing
>>> wrapper around any existing input formats and merging the data into a
>>> single unified Druid row.
>>> Proposed changes
>>>
>>> Let's look at a sample input format from the above discussion
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *"inputFormat":{        "type": "kafka", // New input format type
>>> "headerLabelPrefix": "kafka.header.", // Label prefix for header columns,
>>> this will avoid collisions while merging columns
>>> "recordTimestampLabelPrefix": "kafka.", // Kafka record's timestamp is
>>> made
>>> available in case payload does not carry timestamp        "headerFormat":
>>> // Header parser specifying that values are of type string        {
>>>       "type": "string"        },       "valueFormat": // Value parser
>>> from
>>> json parsing       {             "type": "json",
>>>  "flattenSpec":
>>> {                     "useFieldDiscovery": true,
>>> "fields": [...]             }        },        "keyFormat": // Key parser
>>> also from json parsing         {             "type": "json"         }}*
>>>
>>> Since we have independent sections for header, key and payload, it will
>>> also enable parsing each section with its own parser, eg., headers coming
>>> in as string and payload as json.
>>>
>>> KafkaInputFormat(the new inputFormat class) will be the uber class
>>> extending inputFormat interface and will be responsible for creating
>>> individual parsers for header, key and payload, blend the data resolving
>>> conflicts in columns and generating a single unified InputRow for Druid
>>> ingestion.
>>>
>>> "headerFormat" will allow users to plug in a parser type for the header
>>> values and will add the default header prefix as "kafka.header."(can be
>>> overridden) for attributes to avoid collision while merging attributes
>>> with
>>> payload.
>>>
>>> Kafka payload parser will be responsible for parsing the Value portion of
>>> the Kafka record. This is where most of the data will come from and we
>>> should be able to plugin existing parsers. One thing to note here is that
>>> if batching is performed, then the code should be augmenting header and
>>> key
>>> values to every record in the batch.
>>>
>>> Kafka key parser will handle parsing the Key portion of the Kafka record
>>> and will ingest the Key with dimension name as "kafka.key".
>>> Operational impact, Test plan & Future work
>>>
>>> Since we had an immediate need to ingest blended data from header and
>>> payload, we have implemented the above proposal in a PR - here
>>> <https://github.com/apache/druid/pull/11630>
>>> -Lokesh Lingarajan
>>>
>>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message