flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Leonard Xu <xbjt...@gmail.com>
Subject Re: Flip-105 can the debezium/canal SQL sink to database directly?
Date Tue, 30 Jun 2020 02:23:10 GMT
HI Lei,
Jingsong is wright, you need define a primary key for your sink table.
BTW, Flink use `PRIMARY KEY NOT ENFORCED` to define primary key because Flink doesn’t own
data and only supports `NOT ENFORCED` mode, it’s a little bit different with the primary
key  in DB which is default `ENFORCED` , both  `ENFORCED ` and `NOT ENFORCED` are supported
in SQL standard.
You can look up[1][2] for more details.

Best,
Leonard
[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
<https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table>
[2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
<https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table>

> 在 2020年6月30日,10:08,Jingsong Li <jingsonglee0@gmail.com> 写道:
> 
> Hi Lei,
> 
> INSERT INTO  jdbc_table SELECT * FROM changelog_table;
> 
> For Flink 1.11 new connectors, you need to define the primary key for jdbc_table (and
also your mysql table needs to have the corresponding primary key) because changelog_table
has the "update", "delete" records.
> 
> And then, jdbc sink will:
> - insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to deal with "insert"
and "update" messages.
> - delete to deal with "delete" messages.
> 
> So generally speaking, with the primary key, this mysql table will be the same to your
source database table. (table for generating changelog)
> 
> Best,
> Jingsong
> 
> On Tue, Jun 30, 2020 at 9:58 AM wanglei2@geekplus.com.cn <mailto:wanglei2@geekplus.com.cn>
<wanglei2@geekplus.com.cn <mailto:wanglei2@geekplus.com.cn>> wrote:
> 
> CREATE TABLE my_table (
>   id BIGINT,
>   first_name STRING,
>   last_name STRING,
>   email STRING
> ) WITH (
>  'connector'='kafka',
>  'topic'='user_topic',
>  'properties.bootstrap.servers'='localhost:9092',
>  'scan.startup.mode'='earliest-offset',
>  'format'='debezium-json'
> );
> INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table;
> 
> What will happen after  i execute the insert sql statement? For the update/delete message
from kafka, the corresponding record will be updated or deleted in the mysql_sink_table?
> INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table; 
> 
> Thanks,
> Lei
> 
> wanglei2@geekplus.com.cn <mailto:wanglei2@geekplus.com.cn> 
> 
> 
> 
> -- 
> Best, Jingsong Lee


Mime
View raw message