flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Yun Gao" <yungao...@aliyun.com>
Subject Re: Flink Async io problem
Date Tue, 09 Jul 2019 12:06:56 GMT
Hi Venn,

     I think `AsyncFunction#asyncInvoke` should be used to submit asynchronous tasks for the
input records instead of executing the tasks directly. However, it seems that in the code
fragment, the query is executed directly in the asyncInvoke method.

    I think you may also find more information in the document page [1]. A point might need
to be noted is that in the example of the document page, the call to the `client#query` returns
a Future, thus is is an asynchronous action instead of executing the query directly.

   [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
Best,
Yun


------------------------------------------------------------------
From:venn <wxchunjhyy@163.com>
Send Time:2019 Jul. 9 (Tue.) 19:54
To:user <user@flink.apache.org>
Subject:Flink Async io problem


Hi Flink experts,
            I’m working flink async io program for stream join outer database(mysql),but
found sync,please give some advice, or provide some async demo. thanks 

asyncInvoke method are as follow:


@Override
public void asyncInvoke(AsyncUser asyncUser, ResultFuture<AsyncUser> resultFuture) throws
Exception {
// 使用 asyncUser id 查询
ps.setString(1, asyncUser.getId());
ResultSet rs = ps.executeQuery();

CompletableFuture.supplyAsync(new Supplier<AsyncUser>() {
@Override
public AsyncUser get() {
try {
if (!rs.isClosed() && rs.next()) {
asyncUser.setPhone(rs.getString(1));
}
            } catch (SQLException e) {
                e.printStackTrace();
}
return asyncUser;
}
    }).thenAccept((AsyncUser tmp) -> {
        List<AsyncUser> list = new ArrayList();
list.add(tmp);
resultFuture.complete(list);
});
}



Best, Venn

Mime
View raw message