flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jark Wu <imj...@gmail.com>
Subject Re: Flink SQL GROUP BY后写入postgresql数据库主键问题
Date Wed, 01 Apr 2020 13:40:28 GMT
Hi Longfei,

非常抱歉当前确实支持不了,不过这个问题将在 FLIP-95 提供的新 TableSink
接口后解决,有望在 1.11 中解决。

Best,
Jark

On Wed, 1 Apr 2020 at 19:12, Longfei Zhou <zlfmail@vip.qq.com> wrote:

> 问题:
> SQL中对时间窗口和PRODUCT_ID进行了Group
> By聚合操作,PG数据表中的主键须设置为WINDOW_START&nbsp;/WINDOW_END和PRODUCT_ID,否则无法以upinsert方式写出数据,但是这样却无法满足业务场景的需求,业务上应以RANK_ID
> +WINDOW_START&nbsp;/WINDOW_END为主键标识,请问Flink 中该如何实现这个需求?
>
>
>
> 场景:Top3 热门商品
>
>
> 数据样例:
> ORDER_ID,USER_ID,PRODUCT_ID,NUM,ORDER_TIME
> 1,34,6005,4,2019-09-01 00:10:00
> 2,34,6003,1,2019-09-01 00:20:00
> 3,34,6005,4,2019-09-01 00:30:00
> 4,34,6006,3,2019-09-01 00:40:00
> 5,34,6001,6,2019-09-01 00:51:00
> 6,34,6005,1,2019-09-01 01:11:00
>
>
>
> SQL逻辑如下:
> --source
> CREATE TABLE ORDER_DATA{
>         ORDER_ID VARCHAR,
>         USER_ID VARCHAR,
>         PRODUCT_ID VARCHAR,
>         NUM BIGINT,
>         ORDER_TIME TIMESTAMP,
>         WATEERMARK FOR ORDER_TIME AS ORDER_TIME
> }WITH{
>         'connector.type'='kafka',
>         'connector.version'='0.10',
>         'connector.topic'='orderData',
>         'connector.start-mode'='latest-offset',
>         'connector.properties.zookeeper.connect'='xxxx:2181',
>         'connector.properties.boostrap.servers'='xxxx:9092',
>         'connector.properties.group.id'='flink_sql',
>         'format.type'='csv',
>         'format.derive-schema'='true'
> };
>
>
> --sink
> CREATE TABLE PRODUCT_RANK{
>         RANK_ID BIGINT,
>         WINDOW_START TIMESTAMP(3),
>         WINDOW_END TIMESTAMP(3),
>         PRODUCT_ID VARCHAR,
>         TOTAL_NUM BIGINT
> }WITH{
>         'connector.type'='jdbc',
>
> 'connector.url'='jdbc:postgresql://localhost:5432/flink-test?characterEncoding=utf-8',
>         'connector.driver'='org.postgresql.Driver',
>         'connector.table'='product_rank',
>         'connector.username'='xxxxx',
>         'connector.password'='xxxx',
>         'connector.write.flush.max-rows'='1'
> };
>
>
> INSERT INTO PRODUCT_RANK
> &nbsp; &nbsp; SELECT RANK_ID,WINDOW_START,WINDOW_END,PRODUCT_ID,TOTAL_NUM
>         FROM(
>                 SELECT *,
>                 ROW_NUMBER() OVER (PARTITION BY WINDOW_START ORDER BY
> TOTAL_NUM DESC) AS RANK_ID
>                 FROM(
>                         SELECT
>                                 TUMBLE_START(ORDER_TIME,INTERVAL '1' hour)
> AS WINDOW_START,
>                                 TUMBLE_END(ORDER_TIME,INTERVAL '1' hour)
> AS WINDOW_END,
>                                 SUM(NUM) AS TOTAL_NUM.
>                                 PRODUCT_ID
>                         FROM ORDER_DATA
>                         GROUP BY TUMBLE(ORDER_TIME,INTERVAL '1'
> hour),PRODUCT_ID
>                 )
>         ) WHERE RANK_ID <=3;
>
> &nbsp; &nbsp; &nbsp;

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message