flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "butnet@163.com" <but...@163.com>
Subject 向kafka写数据,偶尔会报 FlinkKafka011Exception 导致Job停止问题
Date Sat, 20 Jul 2019 14:36:53 GMT
一个任务从Kafka读数据做统计,
将统计结果写回kafka,
偶尔会报 FlinkKafka011Exception
导致Job停止,
请问大家一般怎么处理的,是catch掉,日志输出吗?

生产者构造代码
FlinkKafkaProducer011 producer = new FlinkKafkaProducer011(kafkaOutputTopic, 
        new KafkaSerializationSchema(KAFKA_OUTPUT_TYPE), 
        producerConfig, 
        Optional.of(new KafkaPartitionerByKey<>()), 
        FlinkKafkaProducer011.Semantic.EXACTLY_ONCE, 
        kafkaProducersPoolSize);
kafka配置如下:
bootstrap.servers=
enable.auto.commit=true
max.poll.records=1000

偶尔发生下面异常:

2019-07-20 21:41:40,576 INFO  org.apache.flink.runtime.taskmanager.Task                  
  - Window(SlidingEventTimeWindows(120000, 60000), EventTimeTrigger, InterfaceStatisticsAggregate,
InterfaceStatisticsWindow) -> (Map, Sink: Unnamed, Sink: Unnamed) (4/6) (1a9a0b3a306ca983b2fc4227aa7a10fe)
switched from RUNNING to FAILED.
java.lang.RuntimeException: Exception occurred while processing valve output watermark:
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could
not forward element to next operator
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:649)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:602)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
        at com.xxxxxx.aiops.window.InterfaceStatisticsWindow.lambda$apply$0(InterfaceStatisticsWindow.java:35)
  //这里代码是:WindowFunction 的 apply 方法,在向 Collector<InterfaceStatisticsOutDto>
out 里输出内容: out.collect(item)
        at java.util.Collections$SingletonList.forEach(Collections.java:4822)
        at com.xxxxxx.aiops.window.InterfaceStatisticsWindow.apply(InterfaceStatisticsWindow.java:31)
        at com.xxxxxx.aiops.window.InterfaceStatisticsWindow.apply(InterfaceStatisticsWindow.java:16)
        at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:46)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
        at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
        at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
        ... 7 more
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed to send
data to Kafka: The server disconnected before a response was received.
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:994)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:615)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:94)
        at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:230)
        at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
        ... 25 more

想请问大家一都如何处理这种情况:
是我哪里配置有问题吗?
我现在处理方式是在输出的地方catch这个Exception, 不知道大家怎么处理:
/**
 * @author butnet
 */
public class InterfaceStatisticsWindow implements WindowFunction<InterfaceStatisticsOutDto,
InterfaceStatisticsOutDto, String, TimeWindow> {
    private static final Logger log = LoggerFactory.getLogger(InterfaceStatisticsWindow.class);
    private static final long serialVersionUID = 1L;

    @Override
    public void apply(String key, TimeWindow window, Iterable<InterfaceStatisticsOutDto>
input,
                      Collector<InterfaceStatisticsOutDto> out) throws Exception {
        if (log.isDebugEnabled()) {
            log.debug("InterfaceStatisticsWindow apply:" + key + " start:" + window.getStart()
+ " " + window.getEnd());
        }
        input.forEach((Consumer<? super InterfaceStatisticsOutDto>) ir -> {
            ir.setWindowsStartTime(window.getStart());
            ir.setWindowsEndTime(window.getEnd());
            ir.setInterfaceName(key);
            try {
                out.collect(ir);
            } catch (Exception ex) {
                log.info("输出异常: " + ex.toString(), ex);
            }
            if (log.isDebugEnabled()) {
                log.debug("InterfaceStatisticsWindow apply forEach:" + ir.getInterfaceName());
            }
        });
    }
}


谢谢
butnet@163.com
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message