flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 徐涛 <happydexu...@gmail.com>
Subject flink requires table key when insert into upsert table sink
Date Fri, 10 Aug 2018 15:06:55 GMT
Hi All,
	I am using flink 1.6 to generate some realtime programs. I want to write the output to table
sink, the code is as below. At first I use append table sink, which error message tells me
that I should use upsert table sink, so I write one. But still another error “Caused by:
org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a
full primary keys if it is updated.” comes out,which blocks me. My questions is how to modify
a table keys in this scenario? I also check the exception stack, and found that the system
infer the keys field by 
val tableKeys: Option[Array[String]] = UpdatingPlanChecker.getUniqueKeyFields(optimizedPlan),
I wonder how to make the function return value ?
Thanks a lot !!!
    var praise = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU FROM praise GROUP BY
HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )
    tableEnv.registerTable("praiseAggr", praise)

    var comment = tableEnv.sqlQuery(s"SELECT article_id,hll(from_uid) as CU FROM comment GROUP
BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )
    tableEnv.registerTable("commentAggr", comment)

    var reader = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as RU FROM reader GROUP BY
HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )
    tableEnv.registerTable("readerAggr", reader)

    var finalTable = tableEnv.sqlUpdate(s"insert into "+ sinkTableName + " " +  " SELECT p.article_id,p.PU,c.CU,r.RU
from praiseAggr p FULL OUTER JOIN commentAggr c on p.article_id=c.article_id FULL OUTER JOIN
readerAggr r on c.article_id=r.article_id")



Henry Xu
View raw message