druid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ben Krug <ben.k...@imply.io>
Subject Re: [Proposal] - Kafka Input Format for headers, key and payload parsing
Date Tue, 07 Sep 2021 19:42:55 GMT
I'm not a coder, but wanted to say that I have heard other druid users ask
for this functionality, so I think it would be useful.
Thank you!

On Tue, Sep 7, 2021 at 10:09 AM Lokesh Lingarajan
<llingarajan@confluent.io.invalid> wrote:

> Hope everyone had a good long weekend. Any updates/comments ?
>
> -Lokesh
>
>
> On Mon, Aug 30, 2021 at 2:43 PM Lokesh Lingarajan <
> llingarajan@confluent.io>
> wrote:
>
> > Motivation
> >
> > Today we ingest a number of high cardinality metrics into Druid across
> > dimensions. These metrics are rolled up on a per minute basis, and are
> very
> > useful when looking at metrics on a partition or client basis. Events is
> > another class of data that provides useful information about a particular
> > incident/scenario inside a Kafka cluster. Events themselves are carried
> > inside the kafka payload, but nonetheless there is some very useful
> > metadata that is carried in kafka headers that can serve as a useful
> > dimension for aggregation and in turn bringing better insights.
> >
> > PR(#10730 <https://github.com/apache/druid/pull/10730>) introduced
> > support for Kafka headers in InputFormats.
> >
> > We still need an input format to parse out the headers and translate
> those
> > into relevant columns in Druid. Until that’s implemented, none of the
> > information available in the Kafka message headers would be exposed. So
> > first there is a need to implement an input format that can parse headers
> > in any given format(provided we support the format) like we parse
> payloads
> > today. Apart from headers there is also some useful information present
> in
> > the key portion of the kafka record. We also need a way to expose the
> data
> > present in the key as druid columns. We need a generic way to express at
> > configuration time what attributes from headers, key and payload need to
> be
> > ingested into druid. We need to keep the design generic enough so that
> > users can specify different parsers for headers, key and payload.
> >
> > Proposal is to design an input format to solve the above by providing
> > wrapper around any existing input formats and merging the data into a
> > single unified Druid row.
> > Proposed changes
> >
> > Let's look at a sample input format from the above discussion
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > *"inputFormat":{        "type": "kafka", // New input format type
> > "headerLabelPrefix": "kafka.header.", // Label prefix for header columns,
> > this will avoid collisions while merging columns
> > "recordTimestampLabelPrefix": "kafka.", // Kafka record's timestamp is
> made
> > available in case payload does not carry timestamp        "headerFormat":
> > // Header parser specifying that values are of type string        {
> >       "type": "string"        },       "valueFormat": // Value parser
> from
> > json parsing       {             "type": "json",
>  "flattenSpec":
> > {                     "useFieldDiscovery": true,
> > "fields": [...]             }        },        "keyFormat": // Key parser
> > also from json parsing         {             "type": "json"         }}*
> >
> > Since we have independent sections for header, key and payload, it will
> > also enable parsing each section with its own parser, eg., headers coming
> > in as string and payload as json.
> >
> > KafkaInputFormat(the new inputFormat class) will be the uber class
> > extending inputFormat interface and will be responsible for creating
> > individual parsers for header, key and payload, blend the data resolving
> > conflicts in columns and generating a single unified InputRow for Druid
> > ingestion.
> >
> > "headerFormat" will allow users to plug in a parser type for the header
> > values and will add the default header prefix as "kafka.header."(can be
> > overridden) for attributes to avoid collision while merging attributes
> with
> > payload.
> >
> > Kafka payload parser will be responsible for parsing the Value portion of
> > the Kafka record. This is where most of the data will come from and we
> > should be able to plugin existing parsers. One thing to note here is that
> > if batching is performed, then the code should be augmenting header and
> key
> > values to every record in the batch.
> >
> > Kafka key parser will handle parsing the Key portion of the Kafka record
> > and will ingest the Key with dimension name as "kafka.key".
> > Operational impact, Test plan & Future work
> >
> > Since we had an immediate need to ingest blended data from header and
> > payload, we have implemented the above proposal in a PR - here
> > <https://github.com/apache/druid/pull/11630>
> > -Lokesh Lingarajan
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message