nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Joe Witt <joe.w...@gmail.com>
Subject Re: GG kafka topic in avro format to NiFi
Date Thu, 22 Mar 2018 16:34:48 GMT
Colin,

You would have not seen this error before from the Kafka processor you
were using as it would not be doing any deserialization.  You would
have seen that error downstream in the LookupRecord processor you had.
Just wanted to clarify that point.

That you're seeing the error now with ConsumeKafkaRecord tells us that
the schema and/or format you're configuring it to read does not align
to what is actually in Kafka.  Possible causes could be that it is
simply the wrong or incompatible schema - or - you're actually writing
the avro schema and the payload but in this case are configured to
only read the bare record/payload.  If you can pull the raw data from
Kafka in bytes and take a look at that to be sure it is what you
expect.  Or, you can use NiFi's click-to-content/provenance
capabilities to look at the raw records as it sends them to the
parse.failures relationship called out in the error.  This should make
troubleshooting really easy.

Can you describe the how and what for getting data into Kafka?  That
might make it easier to describe as well.

Thanks
joe

On Thu, Mar 22, 2018 at 12:28 PM, Colin Williams
<colin.williams.seattle@gmail.com> wrote:
> Hi,
>
> I configured ConsumeKafkaRecord and set schema.name to the value. I think
> something had preventing me from editing it's value previously.
>
> Anyhow I have what looks like a similar error message:
>
>
> 2018-03-22 09:24:24,990 ERROR [Timer-Driven Process Thread-6]
> o.a.n.p.k.pubsub.ConsumeKafkaRecord_0_10
> ConsumeKafkaRecord_0_10[id=0162106b-f849-104c-4246-f70e5b8e320b] Failed to
> parse message from Kafka using   the configured Record Reader. Will route
> message as its own FlowFile to the 'parse.failure' relationship:
> java.io.IOException: Invalid int encoding
>  java.io.IOException: Invalid int encoding
>    at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:145)
>    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:259)
>    at
> org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
>    at
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:430)
>    at
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
>    at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
>    at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
>    at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240)
>    at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
>    at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
>    at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
>    at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
>    at
> org.apache.nifi.avro.AvroReaderWithExplicitSchema.nextAvroRecord(AvroReaderWithExplicitSchema.java:61)
>    at
> org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:36)
>    at
> org.apache.nifi.serialization.RecordReader.nextRecord(RecordReader.java:50)
>    at
> org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:474)
>    at
> org.apache.nifi.processors.kafka.pubsub.ConsumerLease.lambda$processRecords$2(ConsumerLease.java:322)
>    at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1553)
>    at
> java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
>    at
> org.apache.nifi.processors.kafka.pubsub.ConsumerLease.processRecords(ConsumerLease.java:309)
>    at
> org.apache.nifi.processors.kafka.pubsub.ConsumerLease.poll(ConsumerLease.java:170)
>    at
> org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.onTrigger(ConsumeKafkaRecord_0_10.java:327)
>    at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>    at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1122)
>    at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
>    at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
>    at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
>    at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>    at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>    at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>    at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>    at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>    at java.lang.Thread.run(Thread.java:748)
>
> On Thu, Mar 22, 2018 at 4:29 AM, Bryan Bende <bbende@gmail.com> wrote:
>>
>> Hello,
>>
>> You don’t need to use a properties file. In your AvroRecordReader, just
>> change the Schema Name property from ${schema.name} to the actual name of
>> the schema you want to use from the schema registry.
>>
>> It just means that the record reader can only be used with one schema now,
>> rather than trying to dynamically determine the schema from a flow file
>> attribute, which it can’t do in this case because there is no incoming flow
>> file to ConsumeKafkaRecord.
>>
>> -Bryan
>>
>>
>> On Thu, Mar 22, 2018 at 2:41 AM Colin Williams
>> <colin.williams.seattle@gmail.com> wrote:
>>>
>>> Hi Joe,
>>>
>>> Thanks for the suggestion. I started by using the ConsumeKafkaRecord0_10.
>>> But I had read the only way to configure the schema.name was via a
>>> properties file, which I read also required a restart of NiFi.
>>> http://apache-nifi-users-list.2361937.n4.nabble.com/Nifi-1-3-0-Problem-with-schema-name-and-ConsumeKafkaRecord-0-10-processor-td2256.html
>>> That's why I moved away from using ConsumeKafkaRecord and to the regular
>>> consumer. I didn't want to create a properties file and couldn't see how to
>>> set schema.name otherwise.
>>>
>>> Regarding the error information. I saw the error displayed at the
>>> ConsumeKafka processor, from the UI. I will look for log files, and if
>>> necessary configuring logging after configuring ConsumeKafkaRecord via
>>> properties file tomorrow.
>>>
>>> Best,
>>>
>>> Colin Williams
>>>
>>> On Wed, Mar 21, 2018 at 6:45 PM, Joe Witt <joe.witt@gmail.com> wrote:
>>>>
>>>> Colin,
>>>>
>>>> You're using the ConsumeKafka processors.  Given that this is avro
>>>> data for which you have a schema/etc.. I strongly recommend you use
>>>> ConsumeKafkaRecord0_10...
>>>>
>>>> In that you get to specify the record reader/writer you'll need.  You
>>>> will also see dramatically higher performance.
>>>>
>>>> Lets get you reliably reading records from kafka and then move on to
>>>> other details such as LookupRecord/etc..
>>>>
>>>> We'll need to see the actual error information you're getting I
>>>> suspect to be of much help.
>>>>
>>>> Thanks
>>>>
>>>>
>>>>
>>>> On Wed, Mar 21, 2018 at 9:33 PM, Colin Williams
>>>> <colin.williams.seattle@gmail.com> wrote:
>>>> > Hi Joe,
>>>> >
>>>> > I don't believe the Avro schema included, and expect they are the data
>>>> > portion... I think that's why I need to use the avsc file mentioned
>>>> > above...
>>>> >
>>>> > On Wed, Mar 21, 2018 at 6:19 PM, Joe Witt <joe.witt@gmail.com>
wrote:
>>>> >>
>>>> >> Can you share a template of your process group?
>>>> >>
>>>> >> Do the messages in Kafka have the Avro schema included in them or
are
>>>> >> they just the data portion of the record?
>>>> >>
>>>> >> On Wed, Mar 21, 2018 at 9:16 PM, Colin Williams
>>>> >> <colin.williams.seattle@gmail.com> wrote:
>>>> >> > I have an avro avsc file for a table with a definition like:
>>>> >> >
>>>> >> >
>>>> >> >
>>>> >> > {"type":"record","name":"INV_ADJ","namespace":"NSP_SCH","fields":[{"name":"table","type":"string"},{"name":"op_type","type":"string"},{"name":"op_ts","type":"string"},{"name":"current_ts","type":"string"},{"name":"pos","type":"string"},{"name":"primary_keys","type":{"type":"array","items":"string"}},{"name":"tokens","type":{"type":"map","values":"string"},"default":{}},{"name":"before","type":["null",{"type":"record","name":"columns","fields":[{"name":"ITEM","type":["null","string"],"default":null},{"name":"ITEM_isMissing","type":"boolean"},{"name":"INV_STATUS","type":["null","long"],"default":null},{"name":"INV_STATUS_isMissing","type":"boolean"},{"name":"LOC_TYPE","type":["null","string"],"default":null},{"name":"LOC_TYPE_isMissing","type":"boolean"},{"name":"LOCATION","type":["null","long"],"default":null},{"name":"LOCATION_isMissing","type":"boolean"},{"name":"ADJ_QTY","type":["null","double"],"default":null},{"name":"ADJ_QTY_isMissing","type":"boolean"},{"name":"REASON","type":["null","long"],"default":null},{"name":"REASON_isMissing","type":"boolean"},{"name":"ADJ_DATE","type":["null","string"],"default":null},{"name":"ADJ_DATE_isMissing","type":"boolean"},{"name":"PREV_QTY","type":["null","double"],"default":null},{"name":"PREV_QTY_isMissing","type":"boolean"},{"name":"USER_ID","type":["null","string"],"default":null},{"name":"USER_ID_isMissing","type":"boolean"},{"name":"ADJ_WEIGHT","type":["null","double"],"default":null},{"name":"ADJ_WEIGHT_isMissing","type":"boolean"},{"name":"ADJ_WEIGHT_UOM","type":["null","string"],"default":null},{"name":"ADJ_WEIGHT_UOM_isMissing","type":"boolean"},{"name":"CREATE_ID","type":["null","string"],"default":null},{"name":"CREATE_ID_isMissing","type":"boolean"},{"name":"CREATE_DATETIME","type":["null","string"],"default":null},{"name":"CREATE_DATETIME_isMissing","type":"boolean"}]}],"default":null},{"name":"after","type":["null","columns"],"default":null}]}
>>>> >> >
>>>> >> > I have a kafka topic which should contain avro records using
the
>>>> >> > above
>>>> >> > definition.
>>>> >> >
>>>> >> > I've configured the avro registry, reader, and writer with
the the
>>>> >> > above
>>>> >> > definition. When I try using my nifi workflow I get exceptions
>>>> >> > like:
>>>> >> > invalid
>>>> >> > int encoding and don't seem to process any data.
>>>> >> >
>>>> >> > What am I doing wrong?
>>>> >
>>>> >
>>>
>>>
>> --
>> Sent from Gmail Mobile
>
>

Mime
View raw message