flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Qi Luo <luoqi...@gmail.com>
Subject Re: 关于flink中端对端的精确一次性理解问题
Date Wed, 28 Aug 2019 11:19:32 GMT
对于MySQL sink来说,使用2PC我理解应该是不能用MySQL
transaction的。因为如果你在preCommit中(或之前)开启了transaction,任务失败的话数据会直接丢失,没法实现2PC里preCommit成功后必须保证commit成功的语义。一种办法是preCommit时写入mysql临时表,在commit时将临时表数据移动入正式表。

On Wed, Aug 28, 2019 at 2:47 PM 1900 <575209351@qq.com> wrote:

> hi,
>
>
> 看了解释,还是觉得比较抽象,能不能具体点,下面贴下之前一位同学写的实现,总感觉不对,但好像也有点意思,能不能结合代码解释下
>
>
> public class Sink extends TwoPhaseCommitSinkFunction <ObjectNode,
> Connection, Void> {
>
>
> //    private Connection connection;
>
>
>     public Sink() {
>         super(new KryoSerializer <>(Connection.class , new
> ExecutionConfig()) , VoidSerializer.INSTANCE);
>     }
>
>
>     @Override
>     protected void invoke(Connection connection , ObjectNode objectNode ,
> Context context) throws Exception {
>         String  stu     = objectNode.get("value").toString();
>         Student student = JSON.parseObject(stu , Student.class);
>
>
>         System.err.println("start invoke......." + "id = " +
> student.getId() + "  name = " + student.getName() + "   password"
>                            + " = " + student.getPassword() + "  age = " +
> student.getAge());
>
>
>         String            sql = "insert into Student(id,name,password,age)
> values (?,?,?,?)";
>         PreparedStatement ps  = connection.prepareStatement(sql);
>         ps.setInt(1 , student.getId());
>         ps.setString(2 , student.getName());
>         ps.setString(3 , student.getPassword());
>         ps.setInt(4 , student.getAge());
>         ps.executeUpdate();
>         //手动制造异常
>         if (student.getId() == 33) {
>             System.out.println(1 / 0);
>         }
>     }
>
>
>     @Override
>     protected Connection beginTransaction() throws Exception {
>         String url = "jdbc:mysql:";
>         return DBConnectUtil.getConnection(url , "" , "");
>     }
>
>
>     @Override
>     protected void preCommit(Connection connection) throws Exception {
>     }
>
>
>     @Override
>     protected void commit(Connection connection) {
>         if (connection != null) {
>             try {
>                 connection.commit();
>             } catch (SQLException e) {
>                 System.err.println("commit  error ............" +
> e.getMessage());
>             } finally {
>                 try {
>                     connection.close();
>                 } catch (SQLException e) {
>                     System.err.println(" finally  commit error
> ............" + e.getMessage());
>                 }
>             }
>         }
>     }
>
>
>     @Override
>     protected void abort(Connection connection) {
>         if (connection != null) {
>             try {
>                 connection.rollback();
>             } catch (SQLException e) {
>                 System.err.println("abort error ............" +
> e.getMessage());
>             } finally {
>                 try {
>                     connection.close();
>                 } catch (SQLException e) {
>                     System.err.println(" finally  abort error
> ............" + e.getMessage());
>                 }
>             }
>         }
>     }
> }
>
>
>
>
>
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "Yun Tang"<myasuka@live.com>;
> 发送时间: 2019年8月27日(星期二) 下午3:00
> 收件人: "user-zh"<user-zh@flink.apache.org>;
>
> 主题: Re: 关于flink中端对端的精确一次性理解问题
>
>
>
> Hi
>
>
> 可以看一下TwoPhaseCommitSinkFunction的实现,preCommit是在snapshotState时调用,会将当前的currentTransactionHolder存储到pendingCommitTransactions,直到notifyCheckpointComplete时(也就是commit时),将pendingCommitTransactions取出进行事务性操作。所以preCommit时候不需要写是不符合语义的。
>
> 如果借助TwoPhaseCommitSinkFunction,确实需要适当减少checkpoint
> interval,否则可能很久都没有输出,但是不建议在state较大时设置成秒级。
>
> 祝好
> 唐云
> ________________________________
> From: 1900 <575209351@qq.com>
> Sent: Tuesday, August 27, 2019 14:15
> To: user-zh <user-zh@flink.apache.org>
> Subject: 关于flink中端对端的精确一次性理解问题
>
>
> 根据flink端对端的定义,采用二阶段提交,flink中已经封装好了,放到TwoPhaseCommitSinkFunction中,只要实现对应的方法就行,
>
>
>
> – beginTransaction
>
> – preCommit
>
> – commit
>
> – abort
>
>
>
> 如果sink端的操作是幂等的,就不需要实现了,假设现在我要实现写入MYSQL,实现端对端精确一次性,这种情况下是不是
preCommit不需要写,
>
> 然后最终commit再提交事务?这样事务的打开时间就跟checkpoint的时间关联了,假设checkpoint时间很长,是不是意味着事务打开时间很长,有风险;
> 而且如果checkpoint要持续的进行着(原来假设每分钟一次checkpoint,现在要每秒一次?),不能设置间隔时间?
> 或者是间隔时间都算到下次checkpoint中,这样事务的打开时间更长了,请问是不是这样理解?
> 这样端对端精确一次性是不是效果很小?
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message