flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Longfei Zhou" <zlfm...@vip.qq.com>
Subject Flink SQL GROUP BY后写入postgresql数据库主键问题
Date Wed, 01 Apr 2020 11:12:27 GMT
问题:
SQL中对时间窗口和PRODUCT_ID进行了Group By聚合操作,PG数据表中的主键须设置为WINDOW_START&nbsp;/WINDOW_END和PRODUCT_ID,否则无法以upinsert方式写出数据,但是这样却无法满足业务场景的需求,业务上应以RANK_ID
+WINDOW_START&nbsp;/WINDOW_END为主键标识,请问Flink 中该如何实现这个需求?



场景:Top3 热门商品


数据样例:
ORDER_ID,USER_ID,PRODUCT_ID,NUM,ORDER_TIME
1,34,6005,4,2019-09-01 00:10:00
2,34,6003,1,2019-09-01 00:20:00
3,34,6005,4,2019-09-01 00:30:00
4,34,6006,3,2019-09-01 00:40:00
5,34,6001,6,2019-09-01 00:51:00
6,34,6005,1,2019-09-01 01:11:00



SQL逻辑如下:
--source
CREATE TABLE ORDER_DATA{
	ORDER_ID VARCHAR,
	USER_ID VARCHAR,
	PRODUCT_ID VARCHAR,
	NUM BIGINT,
	ORDER_TIME TIMESTAMP,
	WATEERMARK FOR ORDER_TIME AS ORDER_TIME
}WITH{
	'connector.type'='kafka',
	'connector.version'='0.10',
	'connector.topic'='orderData',
	'connector.start-mode'='latest-offset',
	'connector.properties.zookeeper.connect'='xxxx:2181',
	'connector.properties.boostrap.servers'='xxxx:9092',
	'connector.properties.group.id'='flink_sql',
	'format.type'='csv',
	'format.derive-schema'='true'
};


--sink
CREATE TABLE PRODUCT_RANK{
	RANK_ID BIGINT,
	WINDOW_START TIMESTAMP(3),
	WINDOW_END TIMESTAMP(3),
	PRODUCT_ID VARCHAR,
	TOTAL_NUM BIGINT
}WITH{
	'connector.type'='jdbc',
	'connector.url'='jdbc:postgresql://localhost:5432/flink-test?characterEncoding=utf-8',
	'connector.driver'='org.postgresql.Driver',
	'connector.table'='product_rank',
	'connector.username'='xxxxx',
	'connector.password'='xxxx',
	'connector.write.flush.max-rows'='1'
};


INSERT INTO PRODUCT_RANK
&nbsp; &nbsp; SELECT RANK_ID,WINDOW_START,WINDOW_END,PRODUCT_ID,TOTAL_NUM
	FROM(
		SELECT *,
		ROW_NUMBER() OVER (PARTITION BY WINDOW_START ORDER BY TOTAL_NUM DESC) AS RANK_ID
		FROM(
			SELECT
				TUMBLE_START(ORDER_TIME,INTERVAL '1' hour) AS WINDOW_START,
				TUMBLE_END(ORDER_TIME,INTERVAL '1' hour) AS WINDOW_END,
				SUM(NUM) AS TOTAL_NUM.
				PRODUCT_ID
			FROM ORDER_DATA
			GROUP BY TUMBLE(ORDER_TIME,INTERVAL '1' hour),PRODUCT_ID
		)
	) WHERE RANK_ID <=3;

&nbsp; &nbsp; &nbsp;
Mime
  • Unnamed multipart/alternative (inline, 8-Bit, 0 bytes)
View raw message