flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "1900" <575209...@qq.com>
Subject 回复: 关于flink中端对端的精确一次性理解问题
Date Wed, 28 Aug 2019 06:47:14 GMT
hi,


看了解释,还是觉得比较抽象,能不能具体点,下面贴下之前一位同学写的实现,总感觉不对,但好像也有点意思,能不能结合代码解释下


public class Sink extends TwoPhaseCommitSinkFunction <ObjectNode, Connection, Void>
{


//    private Connection connection;


    public Sink() {
        super(new KryoSerializer <>(Connection.class , new ExecutionConfig()) , VoidSerializer.INSTANCE);
    }


    @Override
    protected void invoke(Connection connection , ObjectNode objectNode , Context context)
throws Exception {
        String  stu     = objectNode.get("value").toString();
        Student student = JSON.parseObject(stu , Student.class);


        System.err.println("start invoke......." + "id = " + student.getId() + "  name = "
+ student.getName() + "   password"
                           + " = " + student.getPassword() + "  age = " + student.getAge());


        String            sql = "insert into Student(id,name,password,age) values (?,?,?,?)";
        PreparedStatement ps  = connection.prepareStatement(sql);
        ps.setInt(1 , student.getId());
        ps.setString(2 , student.getName());
        ps.setString(3 , student.getPassword());
        ps.setInt(4 , student.getAge());
        ps.executeUpdate();
        //手动制造异常
        if (student.getId() == 33) {
            System.out.println(1 / 0);
        }
    }


    @Override
    protected Connection beginTransaction() throws Exception {
        String url = "jdbc:mysql:";
        return DBConnectUtil.getConnection(url , "" , "");
    }


    @Override
    protected void preCommit(Connection connection) throws Exception {
    }


    @Override
    protected void commit(Connection connection) {
        if (connection != null) {
            try {
                connection.commit();
            } catch (SQLException e) {
                System.err.println("commit  error ............" + e.getMessage());
            } finally {
                try {
                    connection.close();
                } catch (SQLException e) {
                    System.err.println(" finally  commit error ............" + e.getMessage());
                }
            }
        }
    }


    @Override
    protected void abort(Connection connection) {
        if (connection != null) {
            try {
                connection.rollback();
            } catch (SQLException e) {
                System.err.println("abort error ............" + e.getMessage());
            } finally {
                try {
                    connection.close();
                } catch (SQLException e) {
                    System.err.println(" finally  abort error ............" + e.getMessage());
                }
            }
        }
    }
}







------------------ 原始邮件 ------------------
发件人: "Yun Tang"<myasuka@live.com>;
发送时间: 2019年8月27日(星期二) 下午3:00
收件人: "user-zh"<user-zh@flink.apache.org>;

主题: Re: 关于flink中端对端的精确一次性理解问题



Hi

可以看一下TwoPhaseCommitSinkFunction的实现,preCommit是在snapshotState时调用,会将当前的currentTransactionHolder存储到pendingCommitTransactions,直到notifyCheckpointComplete时(也就是commit时),将pendingCommitTransactions取出进行事务性操作。所以preCommit时候不需要写是不符合语义的。

如果借助TwoPhaseCommitSinkFunction,确实需要适当减少checkpoint interval,否则可能很久都没有输出,但是不建议在state较大时设置成秒级。

祝好
唐云
________________________________
From: 1900 <575209351@qq.com>
Sent: Tuesday, August 27, 2019 14:15
To: user-zh <user-zh@flink.apache.org>
Subject: 关于flink中端对端的精确一次性理解问题

根据flink端对端的定义,采用二阶段提交,flink中已经封装好了,放到TwoPhaseCommitSinkFunction中,只要实现对应的方法就行,



– beginTransaction

– preCommit

– commit

– abort



如果sink端的操作是幂等的,就不需要实现了,假设现在我要实现写入MYSQL,实现端对端精确一次性,这种情况下是不是
preCommit不需要写,
然后最终commit再提交事务?这样事务的打开时间就跟checkpoint的时间关联了,假设checkpoint时间很长,是不是意味着事务打开时间很长,有风险;
而且如果checkpoint要持续的进行着(原来假设每分钟一次checkpoint,现在要每秒一次?),不能设置间隔时间?
或者是间隔时间都算到下次checkpoint中,这样事务的打开时间更长了,请问是不是这样理解?
这样端对端精确一次性是不是效果很小?
Mime
  • Unnamed multipart/alternative (inline, 8-Bit, 0 bytes)
View raw message