From issues-return-94128-archive-asf-public=cust-asf.ponee.io@nifi.apache.org Fri Mar 13 16:52:02 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 3B11E18062C for ; Fri, 13 Mar 2020 17:52:02 +0100 (CET) Received: (qmail 52749 invoked by uid 500); 13 Mar 2020 16:52:01 -0000 Mailing-List: contact issues-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list issues@nifi.apache.org Received: (qmail 52735 invoked by uid 99); 13 Mar 2020 16:52:01 -0000 Received: from mailrelay1-us-west.apache.org (HELO mailrelay1-us-west.apache.org) (209.188.14.139) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Mar 2020 16:52:01 +0000 Received: from jira-he-de.apache.org (static.172.67.40.188.clients.your-server.de [188.40.67.172]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 9F52EE0F34 for ; Fri, 13 Mar 2020 16:52:00 +0000 (UTC) Received: from jira-he-de.apache.org (localhost.localdomain [127.0.0.1]) by jira-he-de.apache.org (ASF Mail Server at jira-he-de.apache.org) with ESMTP id 15468780402 for ; Fri, 13 Mar 2020 16:52:00 +0000 (UTC) Date: Fri, 13 Mar 2020 16:52:00 +0000 (UTC) From: "Joe Witt (Jira)" To: issues@nifi.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Updated] (NIFI-7249) [Regression] AvroReader: Could not parse incoming data MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/NIFI-7249?page=3Dcom.atlassian= .jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Witt updated NIFI-7249: --------------------------- Fix Version/s: 1.11.4 > [Regression] AvroReader: Could not parse incoming data > ------------------------------------------------------ > > Key: NIFI-7249 > URL: https://issues.apache.org/jira/browse/NIFI-7249 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions > Environment: Debian, Java 11 and Java 8 > Reporter: Philipp Leufke > Assignee: Matt Burgess > Priority: Major > Fix For: 1.12.0, 1.11.4 > > Attachments: AvroReader_bug_MWE.xml > > Time Spent: 1h 40m > Remaining Estimate: 0h > > *Assessment* > {code:java} > AvroTypeUtil.convertUnionFieldValue > {code} > has the following: > {code:java} > Optional mostSuitableType =3D DataTypeUtils.findMostSuita= bleType( > originalValue, > fieldSchema.getTypes().stream().filter(schema -> schema.g= etType() !=3D Type.NULL).collect(Collectors.toList()), > subSchema -> AvroTypeUtil.determineDataType(subSchema) > ); > {code} > which in turn has the following: > {code:java} > DataType inferredDataType =3D inferDataType(value, null); > {code} > which in turn has the following: > {code:java} > if (value instanceof Map) { > final Map map =3D (Map) value; > {code} > {{originalValue/value}} is a map extracted from an avro record that has {= {Utf8}} keys instead of {{String}}. > The issue in general however is the fact that we are dealing with an *avr= o-specific object* where previously *only NiFi-specific value objects were = processed*. > There are multiple approaches to fix this: > # Consider this special case as a technical issue. We accept the fact tha= t avro objects can leak into this layer and prepare it so it behaves as nee= ded. I.e. transform the avro map to another where the keys are {{String}} o= bjects. > # Consider this an error-handling issue. Inference can be treated as a be= st-effort attempt and in case of an error we can fall back to the original = logic. Inference was added here to be able to choose the best matching type= from a UNION/CHOICE. If inference doesn't yield a result, the original log= ic goes over all types within the UNION/CHOICE and selects the _first compa= tible_ one. When a Map is in a UNION/CHOICE the other types will not pose c= ompatibility issues so the original logic would work well. > (1. and 2. are not mutually exclusive.) > # Enhance inference logic so that the avro object is converted to a gener= al object before inference occurs. This would eliminate the avro (or other = third-party specific) objects being able to leak into the framework's forma= t-agnostic layer. > ---- > *Issue report* > Severe regression in Version 1.11.3, compared to 1.9.2: > Record based processors cannot deserialize Avro messages any longer. Exam= ples: > * ConsumeKafkaRecord: with embedded Avro schema or using Confluent Schem= a Registry > * ConvertRecord: with embedded Avro schema or using Confluent Schema Reg= istry, too > * probably others as well... > Error messages: > {noformat} > ConvertRecord[id=3Dc3ed29c6-0170-1000-a960-809827e7654d] > Failed to process StandardFlowFileRecord[uuid=3Db3869d82-6c50-484e-8d0c-b= 64b5d5a3ac3,claim=3DStandardContentClaim=20 > [resourceClaim=3DStandardResourceClaim[id=3D1584002690648-1091, container= =3Ddefault, section=3D67], offset=3D276387, length=3D3487] > ,offset=3D0,name=3Db3869d82-6c50-484e-8d0c-b64b5d5a3ac3,size=3D3487]; wil= l route to failure: > Could not parse incoming data{noformat} > {noformat} > ConsumeKafkaRecord_2_0[id=3Dd9ebdbda-51b7-38ce-b43e-3197322bd2e1] > Failed to parse message from Kafka using the configured Record Reader. > Will route message as its own FlowFile to the 'parse.failure' relationshi= p: org.apache.nifi.serialization.MalformedRecordException: > Error while getting next record. Root cause: java.lang.ClassCastException > {noformat} > =C2=A0 > However, the messages with enmbedded schema can flawlessly be converted t= o JSON using ConvertAvroToJson. > =C2=A0 > The behavior has been confirmed using various different flows and configu= rations with different Java versions. A downgrade to Nifi 1.9.2 resolves th= e issue, a subsequent upgrade to 1.11.3 brings it back. > =C2=A0 > Please find attached a minimal example template... > =C2=A0 > Stack traces: > =C2=A0 > {noformat} > 2020-03-12 09:37:16,628 DEBUG [Timer-Driven Process Thread-4] org.apache.= nifi.avro.AvroTypeUtil fail to convert field tags > java.lang.ClassCastException: class org.apache.avro.util.Utf8 cannot be c= ast to class java.lang.String (org.apache.avro.util.Utf8 is in unnamed modu= le of loader org.apache.nifi.nar.NarClassLoader @515ebef3; java.lang.String= is in module java.base of loader 'bootstrap') > at org.apache.nifi.serialization.record.util.DataTypeUtils.inferR= ecordDataType(DataTypeUtils.java:544) > at org.apache.nifi.serialization.record.util.DataTypeUtils.inferD= ataType(DataTypeUtils.java:478) > at org.apache.nifi.serialization.record.util.DataTypeUtils.findMo= stSuitableType(DataTypeUtils.java:267) > at org.apache.nifi.avro.AvroTypeUtil.convertUnionFieldValue(AvroT= ypeUtil.java:882) > at org.apache.nifi.avro.AvroTypeUtil.normalizeValue(AvroTypeUtil.= java:1004) > at org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroT= ypeUtil.java:857) > at org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroT= ypeUtil.java:830) > at org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordRea= der.java:45) > at org.apache.nifi.serialization.RecordReader.nextRecord(RecordRe= ader.java:50) > at org.apache.nifi.processors.standard.AbstractRecordProcessor$1.= process(AbstractRecordProcessor.java:131) > at org.apache.nifi.controller.repository.StandardProcessSession.w= rite(StandardProcessSession.java:3006) > at org.apache.nifi.processors.standard.AbstractRecordProcessor.on= Trigger(AbstractRecordProcessor.java:122) > at org.apache.nifi.processor.AbstractProcessor.onTrigger(Abstract= Processor.java:27) > at org.apache.nifi.controller.StandardProcessorNode.onTrigger(Sta= ndardProcessorNode.java:1176) > at org.apache.nifi.controller.tasks.ConnectableTask.invoke(Connec= tableTask.java:213) > at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAge= nt$1.run(TimerDrivenSchedulingAgent.java:117) > at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) > at java.base/java.util.concurrent.Executors$RunnableAdapter.call(= Executors.java:515) > at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTa= sk.java:305) > at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$Sch= eduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Th= readPoolExecutor.java:1128) > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(T= hreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:834)2020-03-12 09:3= 7:16,632 ERROR [Timer-Driven Process Thread-4] o.a.n.processors.standard.Co= nvertRecord ConvertRecord[id=3Dc3ed29c6-0170-1000-a960-809827e7654d] Failed= to process StandardFlowFileRecord[uuid=3D33856f9d-1991-4c95-90c2-3ffd032fc= 840,claim=3DStandardContentClaim [resourceClaim=3DStandardResourceClaim[id= =3D1584005835899-1, container=3Ddefault, section=3D1], offset=3D851, length= =3D3487],offset=3D0,name=3D33856f9d-1991-4c95-90c2-3ffd032fc840,size=3D3487= ]; will route to failure: org.apache.nifi.processor.exception.ProcessExcept= ion: Could not parse incoming data > org.apache.nifi.processor.exception.ProcessException: Could not parse inc= oming data > at org.apache.nifi.processors.standard.AbstractRecordProcessor$1.= process(AbstractRecordProcessor.java:171) > at org.apache.nifi.controller.repository.StandardProcessSession.w= rite(StandardProcessSession.java:3006) > at org.apache.nifi.processors.standard.AbstractRecordProcessor.on= Trigger(AbstractRecordProcessor.java:122) > at org.apache.nifi.processor.AbstractProcessor.onTrigger(Abstract= Processor.java:27) > at org.apache.nifi.controller.StandardProcessorNode.onTrigger(Sta= ndardProcessorNode.java:1176) > at org.apache.nifi.controller.tasks.ConnectableTask.invoke(Connec= tableTask.java:213) > at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAge= nt$1.run(TimerDrivenSchedulingAgent.java:117) > at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) > at java.base/java.util.concurrent.Executors$RunnableAdapter.call(= Executors.java:515) > at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTa= sk.java:305) > at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$Sch= eduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Th= readPoolExecutor.java:1128) > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(T= hreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:834) > Caused by: org.apache.nifi.serialization.MalformedRecordException: Error = while getting next record. Root cause: java.lang.ClassCastException: class = org.apache.avro.util.Utf8 cannot be cast to class java.lang.String (org.apa= che.avro.util.Utf8 is in unnamed module of loader org.apache.nifi.nar.NarCl= assLoader @515ebef3; java.lang.String is in module java.base of loader 'boo= tstrap') > at org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordRea= der.java:52) > at org.apache.nifi.serialization.RecordReader.nextRecord(RecordRe= ader.java:50) > at org.apache.nifi.processors.standard.AbstractRecordProcessor$1.= process(AbstractRecordProcessor.java:131) > ... 13 common frames omitted > Caused by: java.lang.ClassCastException: class org.apache.avro.util.Utf8 = cannot be cast to class java.lang.String (org.apache.avro.util.Utf8 is in u= nnamed module of loader org.apache.nifi.nar.NarClassLoader @515ebef3; java.= lang.String is in module java.base of loader 'bootstrap') > at org.apache.nifi.serialization.record.util.DataTypeUtils.inferR= ecordDataType(DataTypeUtils.java:544) > at org.apache.nifi.serialization.record.util.DataTypeUtils.inferD= ataType(DataTypeUtils.java:478) > at org.apache.nifi.serialization.record.util.DataTypeUtils.findMo= stSuitableType(DataTypeUtils.java:267) > at org.apache.nifi.avro.AvroTypeUtil.convertUnionFieldValue(AvroT= ypeUtil.java:882) > at org.apache.nifi.avro.AvroTypeUtil.normalizeValue(AvroTypeUtil.= java:1004) > at org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroT= ypeUtil.java:857) > at org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroT= ypeUtil.java:830) > at org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordRea= der.java:45) > ... 15 common frames omitted > {noformat} > =C2=A0 > =C2=A0 > =C2=A0 > =C2=A0 -- This message was sent by Atlassian Jira (v8.3.4#803005)