gobblin-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kidwell, Jack" <Jack_Kidwel...@comcast.com>
Subject RE: KafkaSimpleJsonExtractor
Date Thu, 12 Apr 2018 14:39:18 GMT
Thank you for your time spent on our question.

This quick start document,
https://gobblin.readthedocs.io/en/latest/case-studies/Kafka-HDFS-Ingestion/,
shows “source.class=org.apache.gobblin.source.extractor.extract.kafka.KafkaSimpleSource”.
That class overrides method, getExtractor, to return “new KafkaSimpleExtractor(state)”,
and class, KafkaSimpleExtractor, overrides method, decodeRecord, to return just the Kafka
message value.

  return kafkaConsumerRecord.getMessageBytes()

Since we want both key and value stored in HDFS, class, KafkaSimpleJsonExtractor, appears
to provide the desired decodeRecord method:

protected byte[] decodeRecord(ByteArrayBasedKafkaRecord messageAndOffset) throws IOException
{
        long offset = messageAndOffset.getOffset();

        byte[] keyBytes = messageAndOffset.getKeyBytes();
        String key = (keyBytes == null) ? "" : new String(keyBytes, CHARSET);

        byte[] payloadBytes = messageAndOffset.getMessageBytes();
        String payload = (payloadBytes == null) ? "" : new String(payloadBytes, CHARSET);

        KafkaRecord record = new KafkaRecord(offset, key, payload);

        byte[] decodedRecord = gson.toJson(record).getBytes(CHARSET);
        return decodedRecord;
    }

The architecture diagram leads us to think that the exactor is the point where both Kafka
key and value are visible in the Gobblin pipeline:
https://gobblin.readthedocs.io/en/latest/Gobblin-Architecture/#gobblin-constructs

We are eager to learn from you.


From: Vicky Kak [mailto:vicky.kak@gmail.com]
Sent: Thursday, April 12, 2018 9:45 AM
To: user@gobblin.incubator.apache.org
Subject: Re: KafkaSimpleJsonExtractor

For writing the data to the HDFS you need to take a look at the Writer Implementation, please
look for it. The Extractor is used in pulling the data which is thereafter passed through
various stages before it is stored via the Writer.
What I mean by the other subscriber was to add one more consumer which will  read the key/value
e.g kafka-console-consumer.sh which comes with the kafka.
I was not earlier aware that you wanted to store the data in HFDS as you had given the reference
of the Extractor so it was not clear to me.
Please take a look at the Kafka to HDFS writer and see if that helps, in case it is not doing
what is exactly required by you then you may have to plugin the customized writer.


On Thu, Apr 12, 2018 at 6:47 PM, Kidwell, Jack <Jack_Kidwelljr@comcast.com<mailto:Jack_Kidwelljr@comcast.com>>
wrote:
We want to extract both key and value from a Kafka message and publish the combined information
to HDFS. Keys contain numeric ids for the strings contained in values.

Please explain “other subscriber”. Are you proposing a different Kafka message structure?

KafkaSimpleJsonExtractor.java caught my attention because it extracts both keys and values
and puts them in decodedRecord.

From: Vicky Kak [mailto:vicky.kak@gmail.com<mailto:vicky.kak@gmail.com>]
Sent: Thursday, April 12, 2018 1:17 AM
To: user@gobblin.incubator.apache.org<mailto:user@gobblin.incubator.apache.org>
Subject: Re: KafkaSimpleJsonExtractor

I am not sure what you are asking for. Do you want to see the keys/values rendered in the
logs while the extraction is being done?
Why can't you have the other subscriber to the kafka which will render the values rather than
KafkaExtractor implementation rendering the same?


On Wed, Apr 11, 2018 at 11:17 PM, Kidwell, Jack <Jack_Kidwelljr@comcast.com<mailto:Jack_Kidwelljr@comcast.com>>
wrote:
Hi,

Using gobblin_0.10.0, we want to use module, KafkaSimpleJsonExtractor.java in order to see
both kafka record keys and values.

How does one configure a job to achieve it?



Mime
View raw message