flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Zhenghua Gao <doc...@gmail.com>
Subject Re: Flink 1.10 JSON 解析
Date Thu, 26 Mar 2020 08:11:06 GMT
Hi 张宇

看起来是TypeMappingUtils中校验字段物理类型和逻辑类型的bug。
开了一个issue: https://issues.apache.org/jira/browse/FLINK-16800

*Best Regards,*
*Zhenghua Gao*


On Fri, Mar 20, 2020 at 5:28 PM 宇张 <zhangyu@akulaku.com> wrote:

> hi,
> 了解了,我重新整理一下:
> streamTableEnv
>                 .connect(
>                         new Kafka()
>                                 .version("0.11")
>                                 .topic("mysql_binlog_test")
>                                 .startFromEarliest()
>                                 .property("zookeeper.connect",
> "localhost:2181")
>                                 .property("bootstrap.servers",
> "localhost:9092")
>                 )
>                 .withFormat(
>                         new Json()
>                 )
>                 .withSchema(
>                         new Schema()
>                                 .field("business", DataTypes.STRING())
>                                 .field("data", DataTypes.ARRAY(
> DataTypes.ROW(DataTypes.FIELD("id", DataTypes.BIGINT()),
>                                         DataTypes.FIELD("vendor_id",
> DataTypes.DOUBLE()),
>                                         DataTypes.FIELD("status",
> DataTypes.BIGINT()),
>                                         DataTypes.FIELD("create_time",
> DataTypes.BIGINT()),
>                                         DataTypes.FIELD("tracking_number",
> DataTypes.STRING()),
>                                         DataTypes.FIELD("invoice_no",
> DataTypes.STRING()),
>                                         DataTypes.FIELD("parent_id",
> DataTypes.BIGINT()))))
>                                 .field("database", DataTypes.STRING())
>                                 .field("old",
> DataTypes.ARRAY(DataTypes.ROW(DataTypes.FIELD("logistics_status",
> DataTypes.DECIMAL(38,18)))))
>                                 .field("table", DataTypes.STRING())
>                                 .field("ts", DataTypes.BIGINT())
>                                 .field("type", DataTypes.STRING())
>                                 .field("putRowNum", DataTypes.BIGINT())
>                 )
>                 .createTemporaryTable("Test");
> 这里面old复合字段里面子字段的类型使用DECIMAL时抛出异常,采用其他类型是可以的;
> 异常:
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Type ARRAY<ROW<`logistics_status` DECIMAL(38, 18)>> of table field 'old'
> does not match with the physical type ARRAY<ROW<`logistics_status`
> LEGACY('DECIMAL', 'DECIMAL')>> of the 'old' field of the TableSource return
> type.
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:277)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:254)
> at
>
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:157)
> at
> org.apache.flink.table.types.logical.ArrayType.accept(ArrayType.java:110)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:232)
> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
> at
>
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at
>
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:214)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:192)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:112)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.scala:161)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.scala:51)
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLimit.translateToPlan(StreamExecLimit.scala:51)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
> at
>
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
> at
>
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
>
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
> at
>
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
> at
>
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
> at com.akulaku.data.main.StreamMain.main(StreamMain.java:58)
>
>
> On Fri, Mar 20, 2020 at 4:43 PM Jark Wu <imjark@gmail.com> wrote:
>
> > Hi,
> >
> > 你发的图片都裂开了。。。 建议直接贴文本或者先上传到某个图床服务,然后将链接贴过来。
> >
> > 1. 使用 DECIMAL 抛什么错误呢?
> > 2. 如果保留jsonSchema的话,要保证 table schema 和 json schema 是一致的,也就是不仅
table
> schema
> > 要正确,json schema 也得要正确。
> >     这其实多了很多额外的成本,所以一般建议不配置 jsonSchema。理论上
table schema 能映射出所有的复杂的格式。
> >
> > Best,
> > Jark
> >
> >
> > On Fri, 20 Mar 2020 at 14:48, 宇张 <zhangyu@akulaku.com> wrote:
> >
> > > hi、
> > > 好吧,测试发现Decimal用不了,即使是DECIMAL(38, 18),换成其他类型就好了,不知道是不是bug
> > > [image: image.png]
> > >
> > > On Fri, Mar 20, 2020 at 2:17 PM 宇张 <zhangyu@akulaku.com> wrote:
> > >
> > >> hi,我这面再次进行了尝试,当json数据中有数字类型的时候,即使按照将
data 的schema定义需要改成
> > >> ARRAY(ROW(...))
> > >> 另外删除
> > >>
> .jsonSchema(...)后,程序仍然无法运行,当没有数字类型的时候是可以的;而报错信息输出来看,这两个结构是对的上的,但是貌似校验未通过
> > >> [image: image.png]
> > >>
> > >>
> > >> On Fri, Mar 20, 2020 at 12:08 PM 宇张 <zhangyu@akulaku.com> wrote:
> > >>
> > >>> hi,
> > >>> 好的,我这面进行了尝试,将 data 的schema定义需要改成
> > >>> ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no",
> > >>> STRING)))
> > >>> 另外删除 .jsonSchema(...)后,程序数据解析已经没问题了;但是如果保留
> > >>> .jsonSchema(...)的话会抛出如下异常信息:Exception in thread
"main"
> > >>> org.apache.flink.table.api.ValidationException: Type
> > >>> ARRAY<ROW<`tracking_number` STRING, `invoice_no` STRING>>
of table
> > field
> > >>> 'data' does not match with the physical type ROW<`f0`
> > ROW<`tracking_number`
> > >>> STRING, `invoice_no` STRING>> of the 'data' field of the TableSource
> > return
> > >>> type.
> > >>>
> > >>>
> >
> 而之所以保留这个jsonschema是因为我想尝试将这种复杂的json源的元数据保存到hive,进而通过这种方式推断出下面语句的格式,因为我不知道对于上述的复杂json在定义下面sql的时候字段信息怎么映射,或者说有这种复杂json的sql映射案例吗,感谢
> > >>> [image: image.png]
> > >>>
> > >>> On Fri, Mar 20, 2020 at 11:42 AM Jark Wu <imjark@gmail.com> wrote:
> > >>>
> > >>>> Hi,
> > >>>>
> > >>>> 看了你的数据,"data" 是一个 array<row> 的类型,所以
data 的schema定义需要改成
> > >>>> ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no",
> > >>>> STRING)))
> > >>>> 另外建议删除 .jsonSchema(...), 1.10 开始 flink-json 已经支持自动从
table schema 中推断
> > >>>> json
> > >>>> schema 了。
> > >>>>
> > >>>> Best,
> > >>>> Jark
> > >>>>
> > >>>> On Fri, 20 Mar 2020 at 11:34, 宇张 <zhangyu@akulaku.com>
wrote:
> > >>>>
> > >>>> > hi:
> > >>>> > 1、在Json数据解析的时候,请问这里面为什么用的是decimal,而不是bigint
> > >>>> > [image: image.png]
> > >>>> > 2、我在使用connect的时候,发现解析Json数组元素出现异常,这是误用导致的还是一个bug
> > >>>> >
> > >>>> >
> > >>>>
> >
> json:{"business":"riskt","data":[{"tracking_number":"0180024020920","invoice_no":"2020021025"}],"database":"installmentdb","table":"t_sales_order","ts":1581576074069,"type":"UPDATE","putRowNum":19999}
> > >>>> >
> > >>>> >
> > >>>>
> >
> jsonSchema:{"type":"object","properties":{"business":{"type":"string"},"data":{"type":"array","items":[{"type":"object","properties":{"tracking_number":{"type":"string"},"invoice_no":{"type":"string"}}}]},"database":{"type":"string"},"table":{"type":"string"},"ts":{"type":"integer"},"type":{"type":"string"},"putRowNum":{"type":"integer"}}}
> > >>>> > connect:
> > >>>> >
> > >>>> > streamTableEnv
> > >>>> >         .connect(
> > >>>> >                 new Kafka()
> > >>>> >                         .version("0.11")
> > >>>> >                         .topic("mysql_binlog_test_str")
> > >>>> >                         .startFromEarliest()
> > >>>> >                         .property("zookeeper.connect",
> > >>>> "localhost:2181")
> > >>>> >                         .property("bootstrap.servers",
> > >>>> "localhost:9092")
> > >>>> >         )
> > >>>> >         .withFormat(
> > >>>> >                 new Json()
> > >>>> >
> > >>>>
> >
> .jsonSchema("{\"type\":\"object\",\"properties\":{\"business\":{\"type\":\"string\"},\"data\":{\"type\":\"array\",\"items\":[{\"type\":\"object\",\"properties\":{\"tracking_number\":{\"type\":\"string\"},\"invoice_no\":{\"type\":\"string\"}}}]},\"database\":{\"type\":\"string\"},\"table\":{\"type\":\"string\"},\"ts\":{\"type\":\"integer\"},\"type\":{\"type\":\"string\"},\"putRowNum\":{\"type\":\"integer\"}}}")
> > >>>> >         )
> > >>>> >         .withSchema(
> > >>>> >                 new Schema()
> > >>>> >                         .field("business", DataTypes.STRING())
> > >>>> >                         .field("data",
> > >>>> DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.ROW(
> > >>>> >                                 DataTypes.FIELD("tracking_number",
> > >>>> DataTypes.STRING()),
> > >>>> >                                 DataTypes.FIELD("invoice_no",
> > >>>> DataTypes.STRING())))))
> > >>>> >                         .field("database", DataTypes.STRING())
> > >>>> >                         .field("table", DataTypes.STRING())
> > >>>> >                         .field("ts", DataTypes.DECIMAL(38,
18))
> > >>>> >                         .field("type", DataTypes.STRING())
> > >>>> >                         .field("putRowNum", DataTypes.DECIMAL(38,
> > 18))
> > >>>> >         )
> > >>>> >         .createTemporaryTable("Test");
> > >>>> >
> > >>>> > 异常信息:Caused by: java.io.IOException: Failed to deserialize
JSON
> > >>>> object.
> > >>>> >
> > >>>> > at
> > >>>> >
> > >>>>
> >
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
> > >>>> > at
> > >>>> >
> > >>>>
> >
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
> > >>>> > at
> > >>>> >
> > >>>>
> >
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
> > >>>> > at
> > >>>> >
> > >>>>
> >
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146)
> > >>>> > at
> > >>>> >
> > >>>>
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
> > >>>> > at
> > >>>> >
> > >>>>
> >
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> > >>>> > at
> > >>>> >
> > >>>>
> >
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> > >>>> > at
> > >>>> >
> > >>>>
> >
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
> > >>>> > Caused by: java.lang.ClassCastException:
> > >>>> >
> > >>>>
> >
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode
> > >>>> > cannot be cast to
> > >>>> >
> > >>>>
> >
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
> > >>>> > at
> > >>>> >
> > >>>>
> >
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:411)
> > >>>> > at
> > >>>> >
> > >>>>
> >
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> > >>>> > at
> > >>>> >
> > >>>>
> >
> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
> > >>>> > at
> > >>>> >
> > >>>>
> >
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
> > >>>> > at
> > >>>> >
> > >>>>
> >
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> > >>>> > at
> > >>>> >
> > >>>>
> >
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
> > >>>> > ... 7 more
> > >>>> >
> > >>>> >
> > >>>> >
> > >>>>
> > >>>
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message