druid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lokesh Lingarajan <llingara...@confluent.io.INVALID>
Subject Re: [Proposal] - Kafka Input Format for headers, key and payload parsing
Date Tue, 07 Sep 2021 16:29:31 GMT
Hope everyone had a good long weekend. Any updates/comments ?

-Lokesh


On Mon, Aug 30, 2021 at 2:43 PM Lokesh Lingarajan <llingarajan@confluent.io>
wrote:

> Motivation
>
> Today we ingest a number of high cardinality metrics into Druid across
> dimensions. These metrics are rolled up on a per minute basis, and are very
> useful when looking at metrics on a partition or client basis. Events is
> another class of data that provides useful information about a particular
> incident/scenario inside a Kafka cluster. Events themselves are carried
> inside the kafka payload, but nonetheless there is some very useful
> metadata that is carried in kafka headers that can serve as a useful
> dimension for aggregation and in turn bringing better insights.
>
> PR(#10730 <https://github.com/apache/druid/pull/10730>) introduced
> support for Kafka headers in InputFormats.
>
> We still need an input format to parse out the headers and translate those
> into relevant columns in Druid. Until that’s implemented, none of the
> information available in the Kafka message headers would be exposed. So
> first there is a need to implement an input format that can parse headers
> in any given format(provided we support the format) like we parse payloads
> today. Apart from headers there is also some useful information present in
> the key portion of the kafka record. We also need a way to expose the data
> present in the key as druid columns. We need a generic way to express at
> configuration time what attributes from headers, key and payload need to be
> ingested into druid. We need to keep the design generic enough so that
> users can specify different parsers for headers, key and payload.
>
> Proposal is to design an input format to solve the above by providing
> wrapper around any existing input formats and merging the data into a
> single unified Druid row.
> Proposed changes
>
> Let's look at a sample input format from the above discussion
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *"inputFormat":{        "type": "kafka", // New input format type
> "headerLabelPrefix": "kafka.header.", // Label prefix for header columns,
> this will avoid collisions while merging columns
> "recordTimestampLabelPrefix": "kafka.", // Kafka record's timestamp is made
> available in case payload does not carry timestamp        "headerFormat":
> // Header parser specifying that values are of type string        {
>       "type": "string"        },       "valueFormat": // Value parser from
> json parsing       {             "type": "json",             "flattenSpec":
> {                     "useFieldDiscovery": true,
> "fields": [...]             }        },        "keyFormat": // Key parser
> also from json parsing         {             "type": "json"         }}*
>
> Since we have independent sections for header, key and payload, it will
> also enable parsing each section with its own parser, eg., headers coming
> in as string and payload as json.
>
> KafkaInputFormat(the new inputFormat class) will be the uber class
> extending inputFormat interface and will be responsible for creating
> individual parsers for header, key and payload, blend the data resolving
> conflicts in columns and generating a single unified InputRow for Druid
> ingestion.
>
> "headerFormat" will allow users to plug in a parser type for the header
> values and will add the default header prefix as "kafka.header."(can be
> overridden) for attributes to avoid collision while merging attributes with
> payload.
>
> Kafka payload parser will be responsible for parsing the Value portion of
> the Kafka record. This is where most of the data will come from and we
> should be able to plugin existing parsers. One thing to note here is that
> if batching is performed, then the code should be augmenting header and key
> values to every record in the batch.
>
> Kafka key parser will handle parsing the Key portion of the Kafka record
> and will ingest the Key with dimension name as "kafka.key".
> Operational impact, Test plan & Future work
>
> Since we had an immediate need to ingest blended data from header and
> payload, we have implemented the above proposal in a PR - here
> <https://github.com/apache/druid/pull/11630>
> -Lokesh Lingarajan
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message