flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 叶贤勋 <yxx_c...@163.com>
Subject 回复:DML去重,translate时报错
Date Fri, 22 Nov 2019 02:10:53 GMT
Hi JingsongLee, 晓令:
    谢谢你们的答疑。


    备注issue链接:https://issues.apache.org/jira/projects/FLINK/issues/FLINK-14899?filter=allissues


| |
叶贤勋
|
|
yxx_cmhd@163.com
|
签名由网易邮箱大师定制


在2019年11月21日 22:00,贺小令(晓令)<xiaoling.hxl@alibaba-inc.com> 写道:
hi 叶贤勋:

你的SQL里先 ORDER BY proc desc ,然后取  rownum = 1, 等价于 last row 的逻辑。此时会产生
retraction,但是结果表(user_dist) 没有定义 pk 信息,此时是不支持的,即报你看到的错误。

如果将 ORDER BY proc desc 改为 ORDER BY proc asc,加上 rownum = 1,等价于 first
row 的逻辑,不会产生retraction,此时的结果表(user_dist) 是可以满足要求的。

但是 blink planner 目前处理 PROCTIME() 有问题,sql 优化过程将 PROCTIME() 属性丢掉了,被认为只是一个普通的
timestamp 类型,不会翻译成 first row 的逻辑。我建了一个 issue  来 fix 这个问题。

thanks,
godfrey


------------------------------------------------------------------
发件人:JingsongLee <lzljs3620320@aliyun.com.INVALID>
发送时间:2019年11月21日(星期四) 18:44
收件人:user-zh <user-zh@flink.apache.org>; Jark Wu <imjark@gmail.com>; godfrey
he (JIRA) <jira@apache.org>
主 题:Re: DML去重,translate时报错

Hi 叶贤勋:

现在去重现在支持insert into select 语法。
问题在于你的这个SQL怎么没产出UniqueKey
这里面可能有blink-planner的bug。
CC: @Jark Wu @godfrey he (JIRA)

Best,
Jingsong Lee


------------------------------------------------------------------
From:叶贤勋 <yxx_cmhd@163.com>
Send Time:2019年11月21日(星期四) 16:20
To:user-zh@flink.apache.org <user-zh@flink.apache.org>
Subject:DML去重,translate时报错

Hi 大家好:
Flink版本1.9.0,
SQL1:
CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'user_behavior',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.0.key' = 'zookeeper.connect',
'connector.properties.0.value' = 'localhost:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'localhost:9092',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true'
);
SQL2:

CREATE TABLE user_dist (
dt VARCHAR,
user_id VARCHAR,
behavior VARCHAR
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/flink-test',
'connector.table' = 'user_behavior_dup',
'connector.username' = 'root',
'connector.password' = ‘******',
'connector.write.flush.max-rows' = '1'
);
SQL3:

INSERT INTO user_dist
SELECT
dt,
user_id,
behavior
FROM (
SELECT
dt,
user_id,
behavior,
ROW_NUMBER() OVER (PARTITION BY dt, user_id, behavior ORDER BY proc desc ) AS rownum
FROM (select DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') as dt,user_id,behavior,PROCTIME() as proc
from user_log) )
WHERE rownum = 1;


在对SQL3执行tableEnv.sqlUpdate(SQL3)时,报错:
Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink
requires that Table has a full primary keys if it is updated.
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348)


请问去重现在不支持insert into select 语法吗?


| |
叶贤勋
|
|
yxx_cmhd@163.com
|
签名由网易邮箱大师定制


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message