flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 徐涛 <happydexu...@gmail.com>
Subject Re: flink requires table key when insert into upsert table sink
Date Fri, 10 Aug 2018 22:02:25 GMT
Hi Fabian,
	Could you give an example that the query has a unique key?
	What is the mechanism flink infer which field is the unique key(s)?
        Thanks a lot!

Best, Henry

> 在 2018年8月11日,上午5:21,Fabian Hueske <fhueske@gmail.com> 写道:
> 
> Hi Henry,
> 
> The problem is that the table that results from the query does not have a unique key.

> You can only use an upsert sink if the table has a (composite) unique key. Since this
is not the case, you cannot use upsert sink.
> However, you can implement a StreamRetractionTableSink which allows to write any kind
of Table (append-only/update, keyed/non-keyed) to an external system.
> 
> Best, Fabian
> 
> 2018-08-10 17:06 GMT+02:00 徐涛 <happydexutao@gmail.com <mailto:happydexutao@gmail.com>>:
> Hi All,
> 	I am using flink 1.6 to generate some realtime programs. I want to write the output
to table sink, the code is as below. At first I use append table sink, which error message
tells me that I should use upsert table sink, so I write one. But still another error “Caused
by: org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has
a full primary keys if it is updated.” comes out,which blocks me. My questions is how to
modify a table keys in this scenario? I also check the exception stack, and found that the
system infer the keys field by 
> val tableKeys: Option[Array[String]] = UpdatingPlanChecker.getUniqueKeyFields(optimizedPlan),
I wonder how to make the function return value ?
> Thanks a lot !!!
>     var praise = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU FROM praise GROUP
BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )
>     tableEnv.registerTable("praiseAggr", praise)
> 
>     var comment = tableEnv.sqlQuery(s"SELECT article_id,hll(from_uid) as CU FROM comment
GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )
>     tableEnv.registerTable("commentAggr", comment)
> 
>     var reader = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as RU FROM reader GROUP
BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' DAY),article_id" )
>     tableEnv.registerTable("readerAggr", reader)
> 
>     var finalTable = tableEnv.sqlUpdate(s"insert into "+ sinkTableName + " " +  " SELECT
p.article_id,p.PU,c.CU,r.RU from praiseAggr p FULL OUTER JOIN commentAggr c on p.article_id=c.article_id
FULL OUTER JOIN readerAggr r on c.article_id=r.article_id")
> 
> 	
> 
> 	
> 
> Thank,
> Henry Xu
> 


Mime
View raw message