flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From butnet <but...@163.com>
Subject 请问Flink Sink的close方法为什么会被反复执行,但open并没有被调用,他们不是成对出现吗?
Date Fri, 21 Feb 2020 12:43:01 GMT
Hi all:
请问Flink Sink的close方法为什么会被反复执行,但open并没有被调用,他们不是成对出现吗?
以下是Sink的实现和日志,Sink主要做数据库的异步输出,我在open和close中输出日志,
通过日志发现,open只调用了一次,后面非常多的close,请问什么原因,他们不应该是成对出现吗?
环境:JDK8, Flink: flink-1.8.2
Flink是通过标准集群方式(./start-cluster.sh)启动
感谢大家。


|

public class MySqlSink<T> extends CounterRichSinkFunction<T> {
private static final Logger log = LoggerFactory.getLogger(MySqlSink.class);
    private final MySqlUpsertor<T> upsertor;
    private transient volatile BatchThread<T> batchThread;
    private transient volatile DataBaseUtil dataBaseUtil;
    private final String name;

    public MySqlSink(String name) {
this(null, name);
}

public MySqlSink(MySqlUpsertor<T> upsertor, String name) {
this.upsertor = upsertor;
        this.name = name;
}

@Override
public String getName() {
return name;
}

@Override
public void invoke(T value, Context context) throws Exception {
try {
this.counter.inc();
batchThread.push(value);
} catch (Throwable e) {
this.counterError.inc();
log.info("异步输出异常: {}@{} {}", name, hashCode(), e.toString(), e);
}
    }

@Override
public void open(Configuration parameters) throws Exception {
log.info("异步输出 {}@{} open", name, hashCode());
        super.open(parameters);
        try {
if (dataBaseUtil == null) {
dataBaseUtil = DataBaseUtil.getInstance();
}
if (batchThread == null) {
batchThread = new BatchThread<>(getRuntimeContext(), dataBaseUtil, upsertor, name, ()
-> batchThread = null);
batchThread.start();
}
        } catch (Throwable e) {
log.info("创建异常输出线程异常: {}@{} {}", name, hashCode(), e.toString(), e);
}
    }

@Override
public void close() throws Exception {
log.info("异步输出 {}@{} close", name, hashCode());
        super.close();
}
}

|


日志:
2020-02-21 20:29:49 [           main] INFO [util.FlinkUtil            ] windowTime: 60000,
paramWaterInterval: 40000
2020-02-21 20:29:50 [           main] WARN [job.alarm.LimitAlarmJob   ] 未配置 activityNgKafka
参数
2020-02-21 20:29:50 [           main] WARN [job.alarm.LimitAlarmJob   ] 未配置 remoteKafka
参数
2020-02-21 20:29:52 [: ng-host (3/5)] INFO [common.MySqlSink          ] 异步输出 ng-host@97861042
open
2020-02-21 20:29:52 [k: ng-url (3/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@1582284274
open
2020-02-21 20:29:52 [: ng-host (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-host@1255075694
open
2020-02-21 20:29:52 [: ng-host (4/5)] INFO [common.MySqlSink          ] 异步输出 ng-host@971447764
open
2020-02-21 20:29:52 [k: ng-url (2/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@793060686
open
2020-02-21 20:29:52 [k: ng-url (5/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@275482810
open
2020-02-21 20:29:52 [k: ng-url (4/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@543530928
open
2020-02-21 20:29:52 [: ng-host (2/5)] INFO [common.MySqlSink          ] 异步输出 ng-host@989133776
open
2020-02-21 20:29:52 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834
open
2020-02-21 20:29:52 [: ng-host (5/5)] INFO [common.MySqlSink          ] 异步输出 ng-host@268528771
open
2020-02-21 20:29:52 [        ng-host] INFO [common.BatchThread        ] BatchThread ng-host-1
start
2020-02-21 20:29:52 [        ng-host] INFO [common.BatchThread        ] BatchThread ng-host-4
start
2020-02-21 20:29:52 [        ng-host] INFO [common.BatchThread        ] BatchThread ng-host-2
start
2020-02-21 20:29:52 [         ng-url] INFO [common.BatchThread        ] BatchThread ng-url-3
start
2020-02-21 20:29:52 [         ng-url] INFO [common.BatchThread        ] BatchThread ng-url-5
start
2020-02-21 20:29:52 [         ng-url] INFO [common.BatchThread        ] BatchThread ng-url-6
start
2020-02-21 20:29:52 [         ng-url] INFO [common.BatchThread        ] BatchThread ng-url-7
start
2020-02-21 20:29:52 [        ng-host] INFO [common.BatchThread        ] BatchThread ng-host-8
start
2020-02-21 20:29:52 [         ng-url] INFO [common.BatchThread        ] BatchThread ng-url-9
start
2020-02-21 20:29:52 [        ng-host] INFO [common.BatchThread        ] BatchThread ng-host-10
start
2020-02-21 20:30:41 [k: ng-url (2/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@793060686
close
2020-02-21 20:30:41 [k: ng-url (4/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@543530928
close
2020-02-21 20:30:41 [k: ng-url (4/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@543530928
close
2020-02-21 20:30:41 [: ng-host (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-host@1255075694
close
2020-02-21 20:30:41 [: ng-host (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-host@1255075694
close
2020-02-21 20:30:41 [: ng-host (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-host@1255075694
close
2020-02-21 20:30:41 [: ng-host (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-host@1255075694
close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834
close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834
close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834
close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834
close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834
close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834
close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834
close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834
close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834
close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834
close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834
close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834
close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834
close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834
close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834
close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834
close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834
close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834
close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834
close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834
close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834
close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834
close
2020-02-21 20:30:41 [: ng-host (4/5)] INFO [common.MySqlSink          ] 异步输出 ng-host@971447764
close
2020-02-21 20:30:41 [k: ng-url (3/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@1582284274
close
2020-02-21 20:30:41 [: ng-host (4/5)] INFO [common.MySqlSink          ] 异步输出 ng-host@971447764
close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@2092790834
close
2020-02-21 20:30:41 [: ng-host (5/5)] INFO [common.MySqlSink          ] 异步输出 ng-host@268528771
close
2020-02-21 20:30:41 [: ng-host (3/5)] INFO [common.MySqlSink          ] 异步输出 ng-host@97861042
close
2020-02-21 20:30:41 [k: ng-url (4/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@543530928
close
2020-02-21 20:30:41 [: ng-host (2/5)] INFO [common.MySqlSink          ] 异步输出 ng-host@989133776
close
2020-02-21 20:30:41 [k: ng-url (2/5)] INFO [common.MySqlSink          ] 异步输出 ng-url@793060686
close






--
天下事有难易乎,为之,则难者亦易矣;不为,则易者亦难矣。
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message