flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "wind.fly.vip@outlook.com" <wind.fly....@outlook.com>
Subject flink sql使用维表关联时报Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'
Date Mon, 18 May 2020 01:44:42 GMT
Hi, all:
       本人使用的flink 版本为1.10.0,planner为BlinkPlanner,用LEFT JOIN FOR SYSTEM_TIME
AS OF 语法关联维表:
       select TUMBLE_END(l.dt, INTERVAL '30' SECOND) as index_time, l.extra_info['cityCode']
as city_code, v.vehicle_level as vehicle_level, CAST(COUNT(DISTINCT req_body['driverId'])
as STRING) as index_value from x.log.yanfa_log AS l LEFT JOIN x.saic_auth_user.t_driver FOR
SYSTEM_TIME AS OF l.proctime AS d ON l.req_body['driverId'] = d.uid LEFT JOIN x.saic_cms_config.t_vehicle
FOR SYSTEM_TIME AS OF l.proctime AS v ON d.vin=v.vehicle_vin where l.ret_code = '0' and l.servicename
= 'MatchGtw.uploadLocationV4' and l.req_body['appId'] = 'saic_card' GROUP BY TUMBLE(l.dt,
INTERVAL '30' SECOND), l.extra_info['cityCode'], v.vehicle_level;
       建表语句用了computed columns:
       CREATE TABLE x.log.yanfa_log (
    dt TIMESTAMP(3),
    conn_id STRING,
    sequence STRING,
    trace_id STRING,
    span_info STRING,
    service_id STRING,
    msg_id STRING,
    servicename STRING,
    ret_code STRING,
    duration STRING,
    req_body MAP<String,String>,
    res_body MAP<STRING,STRING>,
    extra_info MAP<STRING,STRING>,
    proctime TIMESTAMP(3),
    WATERMARK FOR dt AS dt - INTERVAL '60' SECOND
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = '0.11',
    'connector.topic' = 'x-log-yanfa_log',
    'connector.properties.bootstrap.servers' = '******:9092',
    'connector.properties.zookeeper.connect' = '******:2181',
    'connector.startup-mode' = 'latest-offset',
    'update-mode' = 'append',
    'format.type' = 'json',
    'format.fail-on-missing-field' = 'true'
);
报如下异常:
       Caused by: org.apache.flink.table.api.TableException: Temporal table join currently
only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'
       at org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
       at org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnTableScanRule.matches(CommonLookupJoinRule.scala:118)
       at org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnTableScanRule.matches(CommonLookupJoinRule.scala:131)
       at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
       at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
       at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
       at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
       at org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
       at org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
       at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
       at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
       at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
       at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
       at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
       at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
       at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
       at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
       at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
       at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
       at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
       at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
       at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
       at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
       at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
       at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
       at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
       at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
       at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
       at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
       at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
       at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
       at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
       at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
       at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
       at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
       at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
       at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
       at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
       at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
       at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
       at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
       at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
       at org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
       at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
       at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
       ... 20 more


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message