flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "曾耀武" <513797...@qq.com>
Subject flink 1.6.1 RichAsyncFunction 异步请求外部系统导致下游数据大量重复
Date Sun, 13 Oct 2019 04:55:20 GMT
大家好,


请教大家一个问题, 我在使用flink 1.6 版本去用异步的方式
访问外部系统的时候发现下游的数据,本来应该只有一条的,结果被重复几十条。


我的异步Function 片段如下。


private static class SimpleAsyncFunction extends RichAsyncFunction<String, String> {
    private static final long serialVersionUID = 2098635244857937717L;

    private transient ExecutorService executorService;
    private  transient Client client ;

    private final long shutdownWaitTS=1000;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        executorService = new ThreadPoolExecutor(10,30,60,TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(1000),
                new ThreadPoolExecutor.CallerRunsPolicy());
         client= new Client();
    }

    @Override
    public void close() throws Exception {
        super.close();
        ExecutorUtils.gracefulShutdown(shutdownWaitTS, TimeUnit.MILLISECONDS, executorService);
    }

    @Override
    public void asyncInvoke(final String jsonStr, final ResultFuture<String> resultFuture)
{
          result = client.predict(jsonStr);          resultFuture.complete(Collections.singletonList(result));}}
------------------------------
dag构建部分为:
AsyncFunction<String, String> nlpMoaAsyncFunction = new SimpleAsyncFunction();

DataStream<String> source = env.addSource(flinkKafkaConsumer010);

DataStream<String> nlpResult = AsyncDataStream.unorderedWait(
        source,
        nlpMoaAsyncFunction,
        timeout,
        TimeUnit.MILLISECONDS,
        30);

FlinkKafkaProducer010<String> kafkaProducer = new FlinkKafkaProducer010<String>(
        ImmutableConstant.NLP_RESULT_KAFKA_TOPIC,
        new SimpleStringSchema(),
        producerProp
);

nlpResult.addSink(kafkaProducer);

-------------------------整个逻辑其实很简单,程序运行在yarn 上10个taskmanager
,每个tm 一个slot。 有没有哪个碰到类似的问题,还是是bug 或者是我的实现有问题?
Mime
  • Unnamed multipart/alternative (inline, 8-Bit, 0 bytes)
View raw message