flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "出发" <573693...@qq.com>
Subject 回复:Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效
Date Mon, 23 Mar 2020 06:36:43 GMT
可以建议在计算层那里,获取连接,用完之后手动close,open只是负责初始化连接池。




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"lucas.wu"<lucas.wu@xiaoying.com&gt;;
发送时间:&nbsp;2020年3月23日(星期一) 下午2:34
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;回复:Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效



hi ,我们之前写mysql遇到过这种问题,因为任务的open函数是任务启动的时候调用的,是只调用一次的。而open函数里面会对jdbc
connection进行初始化,当jdbc conection因为各种原因断开的时候,例如空闲时间超过max_idel_timeout。这都会导致flush失败,进而导致整个task重启。所以我们后面参照官方的这个JDBCUpsertOutputFormat
自己写了一个ouputFormat,加上了连接的检查和重连,如果对一致性要求不高的话,还可以对flush进行异常捕捉。


原始邮件
发件人:shangwen583767126@qq.com
收件人:user-zhuser-zh@flink.apache.org
抄送:kevin.shangwenkevin.shangwen@gmail.com
发送时间:2020年3月23日(周一) 11:05
主题:Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效


我们在测试环境测试JDBC写入postgresql的场景,用tcpkill模拟链接被关闭的情况,测试对异常的兼容性,我们发现一直打印类似的日志
2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormatnbsp;
- JDBC executeBatch error, retry times = 1 org.postgresql.util.PSQLException: This connection
has been closed. at org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857) at
org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817) at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813)
at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873) at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569)
at org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62)
at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) 从日志上看来,链接被关闭,只会导致重试一次,这里是Flink的重试代码
//JDBCUpsertOutputFormat.javapublic synchronized void flush() throws Exception { checkFlushException();
for (int i = 1; i = maxRetryTimes; i++) { try { jdbcWriter.executeBatch(); batchCount = 0;
break; } catch (SQLException e) { LOG.error("JDBC executeBatch error, retry times = {}", i,
e); if (i gt;= maxRetryTimes) { throw e; } Thread.sleep(1000 * i); } } } 通过远程debug分析,在第一次执行
JDBCUpsertOutputFormat.flush nbsp; -gt; AppendOnlyWriter.executeBatch nbsp; nbsp; nbsp;...
nbsp; nbsp; nbsp;-gt; PgConnection.getAutoCommit 抛出PSQLException: This connection has
been closed时,batchStatements在这之前已经被清空 // PgStatement.java private BatchResultHandler
internalExecuteBatch() throws SQLException { // Construct query/parameter arrays. transformQueriesAndParameters();
// Empty arrays should be passed to toArray // see http://shipilev.net/blog/2016/arrays-wisdom-ancients/
Query[] queries = batchStatements.toArray(new Query[0]); ParameterList[] parameterLists =
batchParameters.toArray(new ParameterList[0]); batchStatements.clear(); // 这里已经被清空
batchParameters.clear(); ... if (connection.getAutoCommit()) { // 抛出异常 flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN;
} ... } 所以在Flink第二次重试执行这段代码的时候jdbcWriter.executeBatch的时候,batchStatements已经为Empty并直接返回了,导致Flink认为是成功的,这样会导致其实语句没有被执行?
// PgStatement.java public int[] executeBatch() throws SQLException { checkClosed(); closeForNextExecution();
if (batchStatements == null || batchStatements.isEmpty()) { //这里就直接返回了 return
new int[0]; } return internalExecuteBatch().getUpdateCount(); } 目前的想法是,当我出现异常捕获的时候,在重试之前,判断这个链接是否已经被关闭,如果已经被关闭,则重新open,不知道大家有啥看法,这是我提的issue
https://issues.apache.org/jira/browse/FLINK-16708
Mime
  • Unnamed multipart/alternative (inline, 8-Bit, 0 bytes)
View raw message