flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Error handling
Date Mon, 21 Nov 2016 16:54:57 GMT
Hmm, I still don't know what could be causing this. Which version of Flink
are you using? Also, when you say
"BUT when the error is thrown inside invoke implementation from
RichSinkFunction, the recovery is not done and also no further items are
processed even if the kafka consumer is working fine. "

you mean that the job will simply stop and not try restarting, right? Have
you set anything as the restarting strategy?

Cheers,
Aljoscha

On Thu, 17 Nov 2016 at 09:48 criss <ctinmota@gmail.com> wrote:

> Hi,
>
> Here is the code which triggers the error(part of sink):
> @Override
> public void invoke(KafkaLog value) throws Exception {
>         ......................
>         if (arg instanceof String && "error".equals((String)arg)) {
>                 throw new IOException("search for error");
>         }
>         ...........................
> }
>
> And here's the entire stack trace that I have from log file:
>
> 2016-11-15 17:44:20,000 ERROR
> org.apache.flink.streaming.runtime.tasks.StreamTask           - Caught
> exception while processing timer.
> java.lang.RuntimeException: Could not forward element to next operator
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
>         at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
>         at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
>         at
>
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>         at
>
> mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:56)
>         at
>
> mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:50)
>         at
>
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:50)
>         at
>
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:36)
>         at
>
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.fire(WindowOperator.java:543)
>         at
>
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:508)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:796)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: search for error
>         at
>
> mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:48)
>         at
>
> mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:23)
>         at
>
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
>         ... 18 more
> 2016-11-15 17:44:20,036 INFO
> org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer
> service is shutting down.
> 2016-11-15 17:44:20,036 ERROR org.apache.flink.runtime.taskmanager.Task
> - Task execution failed.
> TimerException{java.lang.RuntimeException: Could not forward element to
> next
> operator}
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:802)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not forward element to next
> operator
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
>         at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
>         at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
>         at
>
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>         at
>
> mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:56)
>         at
>
> mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:50)
>         at
>
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:50)
>         at
>
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:36)
>         at
>
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.fire(WindowOperator.java:543)
>         at
>
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:508)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:796)
>         ... 7 more
> Caused by: java.io.IOException: search for error
>         at
>
> mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:48)
>         at
>
> mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:23)
>         at
>
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
>         ... 18 more
> 2016-11-15 17:44:20,037 INFO  org.apache.flink.runtime.taskmanager.Task
> - TriggerWindow(TumblingProcessingTimeWindows(10000),
>
> ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@72b9f6df
> },
> ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) ->
> Sink: Unnamed (1/2) switched to FAILED with exception.
> TimerException{java.lang.RuntimeException: Could not forward element to
> next
> operator}
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:802)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not forward element to next
> operator
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
>         at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
>         at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
>         at
>
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>         at
>
> mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:56)
>         at
>
> mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:50)
>         at
>
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:50)
>         at
>
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:36)
>         at
>
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.fire(WindowOperator.java:543)
>         at
>
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:508)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:796)
>         ... 7 more
> Caused by: java.io.IOException: search for error
>         at
>
> mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:48)
>         at
>
> mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:23)
>         at
>
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
>         ... 18 more
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-handling-tp3448p10168.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Mime
View raw message