Jingsong is wright, you need define a primary key for your sink table.
BTW, Flink use `PRIMARY KEY NOT ENFORCED` to define primary key because Flink doesn’t own data and only supports `NOT ENFORCED` mode, it’s a little bit different with the primary key  in DB which is default `ENFORCED` , both  `ENFORCED ` and `NOT ENFORCED` are supported in SQL standard.
[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
[2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table

INSERT INTO  jdbc_table SELECT * FROM changelog_table;

For Flink 1.11 new connectors, you need to define the primary key for jdbc_table (and also your mysql table needs to have the corresponding primary key) because changelog_table has the "update", "delete" records.

And then, jdbc sink will:
- insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to deal with "insert" and "update" messages.
- delete to deal with "delete" messages.

So generally speaking, with the primary key, this mysql table will be the same to your source database table. (table for generating changelog)


CREATE TABLE my_table (
  id BIGINT,
first_name STRING,
last_name STRING,
email STRING
) WITH (

INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table;

What will happen after  i execute the insert sql statement? For the update/delete message from kafka, the corresponding record will be updated or deleted in the mysql_sink_table?

