flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "wanglei2@geekplus.com.cn" <wangl...@geekplus.com.cn>
Subject Re: Re: Flip-105 can the debezium/canal SQL sink to database directly?
Date Tue, 30 Jun 2020 02:25:39 GMT
Thanks Jingsong,

Is there any document or example to this?
I will build the flink-1.11 package and have a try.

Thanks,
Lei  



wanglei2@geekplus.com.cn 
 
From: Jingsong Li
Date: 2020-06-30 10:08
To: wanglei2@geekplus.com.cn
CC: user
Subject: Re: Flip-105 can the debezium/canal SQL sink to database directly?
Hi Lei,

INSERT INTO  jdbc_table SELECT * FROM changelog_table;

For Flink 1.11 new connectors, you need to define the primary key for jdbc_table (and also
your mysql table needs to have the corresponding primary key) because changelog_table has
the "update", "delete" records.

And then, jdbc sink will:
- insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to deal with "insert"
and "update" messages.
- delete to deal with "delete" messages.

So generally speaking, with the primary key, this mysql table will be the same to your source
database table. (table for generating changelog)

Best,
Jingsong

On Tue, Jun 30, 2020 at 9:58 AM wanglei2@geekplus.com.cn <wanglei2@geekplus.com.cn>
wrote:

CREATE TABLE my_table (
  id BIGINT,
  first_name STRING,
  last_name STRING,
  email STRING
) WITH (
 'connector'='kafka',
 'topic'='user_topic',
 'properties.bootstrap.servers'='localhost:9092',
 'scan.startup.mode'='earliest-offset',
 'format'='debezium-json'
);
INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table;

What will happen after  i execute the insert sql statement? For the update/delete message
from kafka, the corresponding record will be updated or deleted in the mysql_sink_table?
INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table; 

Thanks,
Lei



wanglei2@geekplus.com.cn 



-- 
Best, Jingsong Lee
Mime
View raw message