flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhisheng <zhisheng2...@gmail.com>
Subject Re: Field types of query result and registered TableSink [Result] do not match
Date Thu, 19 Mar 2020 01:33:25 GMT
hi, Jark

我刚使用 1.10.0 测试,报错异常如下:

Exception in thread "main" org.apache.flink.table.api.ValidationException:
Type DECIMAL(5, 2) of table field 'yidun_score' does not match with the
physical type LEGACY('DECIMAL', 'DECIMAL') of the 'yidun_score' field of
the TableSink consumed type.
at
org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)
at
org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:265)
at
org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:254)
at
org.apache.flink.table.types.logical.LegacyTypeInformationType.accept(LegacyTypeInformationType.java:102)
at
org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254)
at
org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160)
at
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:287)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:280)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:194)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190)
at scala.Option.map(Option.scala:146)
at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
at com.zhisheng.sql.blink.stream.example.Test.main(Test.java:49)
Caused by: org.apache.flink.table.api.ValidationException: Legacy decimal
type can only be mapped to DECIMAL(38, 18).
... 26 more

看报错日志像是只能使用 DECIMAL(38, 18),不然映射会失败,然后我将表定义成
DECIMAL(38, 18) 类型,就不报错了。

看了下源码是 TypeMappingUtils 类中的 checkIfCompatible 做了校验

http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-19-012349.png

看了下这个 private 方法的上层是 checkPhysicalLogicalTypeCompatible 方法,这个方法的注释说:

Checks whether the given physical field type and logical field type are
compatible at the edges of the table ecosystem. Types are still compatible
if the physical type is a legacy decimal type (converted from
Types#BIG_DEC) and the logical type is DECIMAL(38, 18). This is to support
legacy TypeInformation for TableSource and TableSink.

看起来像是在兼容旧的 TypeInformation

zhisheng <zhisheng2018@gmail.com> 于2020年3月19日周四 上午8:31写道:

> 对的,这是生产的一个 1.9 blink planner 作业,今天我测试一下 1.10
blink planner
>
> Jark Wu <imjark@gmail.com> 于2020年3月18日周三 下午11:47写道:
>
>> Hi zhisheng,
>>
>> 你用的是1.9吗? 试过 1.10.0 blink planner 么?
>>
>> On Wed, 18 Mar 2020 at 22:21, zhisheng <zhisheng2018@gmail.com> wrote:
>>
>> > hi, all
>> >
>> > 我定义的一个表的一个字段(yidun_score)是 numeric(5,2) 类型,写入
PostgreSQL 的 DDL
>> yidun_score
>> > 字段也是定义的 numeric(5,2) 类型,结果会报异常。
>> >
>> > org.apache.flink.client.program.ProgramInvocationException: The main
>> method
>> > caused an error: Field types of query result and registered TableSink
>> > [Result] do not match.
>> > Query result schema: [user_new_id: Long, total_credit_score: Integer,
>> > total_order_count: Integer, loss_total_order_count: Integer,
>> yidun_score:
>> > BigDecimal, is_delete: Boolean]
>> > TableSink schema:    [user_new_id: Long, total_credit_score: Integer,
>> > total_order_count: Integer, loss_total_order_count: Integer,
>> yidun_score:
>> > BigDecimal, is_delete: Boolean]
>> > at
>> >
>> >
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
>> > at
>> >
>> >
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>> > at
>> >
>> >
>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>> > at
>> >
>> >
>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
>> > at
>> >
>> >
>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
>> > at
>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
>> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
>> > at
>> >
>> >
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
>> > at
>> >
>> >
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
>> > at java.security.AccessController.doPrivileged(Native Method)
>> > at javax.security.auth.Subject.doAs(Subject.java:422)
>> > at
>> >
>> >
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>> > at
>> >
>> >
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
>> > Caused by: org.apache.flink.table.api.ValidationException: Field types
>> of
>> > query result and registered TableSink [Result] do not match.
>> > Query result schema: [user_new_id: Long, total_credit_score: Integer,
>> > total_order_count: Integer, loss_total_order_count: Integer,
>> yidun_score:
>> > BigDecimal, is_delete: Boolean]
>> > TableSink schema:    [user_new_id: Long, total_credit_score: Integer,
>> > total_order_count: Integer, loss_total_order_count: Integer,
>> yidun_score:
>> > BigDecimal, is_delete: Boolean]
>> > at
>> >
>> >
>> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69)
>> > at
>> >
>> >
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179)
>> > at
>> >
>> >
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178)
>> > at scala.Option.map(Option.scala:146)
>> > at
>> >
>> >
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178)
>> > at
>> >
>> >
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
>> > at
>> >
>> >
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
>> > at
>> >
>> >
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> > at
>> >
>> >
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> > at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> >
>> > 我后面把 numeric(5,2) 类型都改成 double,结果发现就不报异常了,可以正常将数据写入
PG,不知道这个 case 是不是一个
>> > bug?
>> >
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message