Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9ABB7200BCA for ; Mon, 21 Nov 2016 17:55:12 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 8F07F160AF9; Mon, 21 Nov 2016 16:55:12 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3F2B4160AEF for ; Mon, 21 Nov 2016 17:55:11 +0100 (CET) Received: (qmail 19796 invoked by uid 500); 21 Nov 2016 16:55:10 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 19787 invoked by uid 99); 21 Nov 2016 16:55:10 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Nov 2016 16:55:10 +0000 Received: from mail-io0-f169.google.com (mail-io0-f169.google.com [209.85.223.169]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 822A51A003E for ; Mon, 21 Nov 2016 16:55:09 +0000 (UTC) Received: by mail-io0-f169.google.com with SMTP id n13so44896721ioe.2 for ; Mon, 21 Nov 2016 08:55:09 -0800 (PST) X-Gm-Message-State: AKaTC02DxTElODcp4ywP0mjCVguHcpZvDHqnD/XTFw7Ou9TgYXCiBa6y3qjY7ldcs1+IwC0hI/02qg0EOJNTgg== X-Received: by 10.107.197.1 with SMTP id v1mr11864441iof.119.1479747308556; Mon, 21 Nov 2016 08:55:08 -0800 (PST) MIME-Version: 1.0 References: <93EAA3A7-B0CF-4137-86D6-E95C9B5CDBF3@apache.org> <1479288415557-10141.post@n4.nabble.com> <1479371818389-10168.post@n4.nabble.com> In-Reply-To: <1479371818389-10168.post@n4.nabble.com> From: Aljoscha Krettek Date: Mon, 21 Nov 2016 16:54:57 +0000 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Error handling To: user@flink.apache.org Content-Type: multipart/alternative; boundary=94eb2c18a43caf1b250541d28411 archived-at: Mon, 21 Nov 2016 16:55:12 -0000 --94eb2c18a43caf1b250541d28411 Content-Type: text/plain; charset=UTF-8 Hmm, I still don't know what could be causing this. Which version of Flink are you using? Also, when you say "BUT when the error is thrown inside invoke implementation from RichSinkFunction, the recovery is not done and also no further items are processed even if the kafka consumer is working fine. " you mean that the job will simply stop and not try restarting, right? Have you set anything as the restarting strategy? Cheers, Aljoscha On Thu, 17 Nov 2016 at 09:48 criss wrote: > Hi, > > Here is the code which triggers the error(part of sink): > @Override > public void invoke(KafkaLog value) throws Exception { > ...................... > if (arg instanceof String && "error".equals((String)arg)) { > throw new IOException("search for error"); > } > ........................... > } > > And here's the entire stack trace that I have from log file: > > 2016-11-15 17:44:20,000 ERROR > org.apache.flink.streaming.runtime.tasks.StreamTask - Caught > exception while processing timer. > java.lang.RuntimeException: Could not forward element to next operator > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358) > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) > at > > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > > mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:56) > at > > mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:50) > at > > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:50) > at > > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:36) > at > > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.fire(WindowOperator.java:543) > at > > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:508) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:796) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: search for error > at > > mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:48) > at > > mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:23) > at > > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373) > ... 18 more > 2016-11-15 17:44:20,036 INFO > org.apache.flink.streaming.runtime.tasks.StreamTask - Timer > service is shutting down. > 2016-11-15 17:44:20,036 ERROR org.apache.flink.runtime.taskmanager.Task > - Task execution failed. > TimerException{java.lang.RuntimeException: Could not forward element to > next > operator} > at > > org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:802) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358) > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) > at > > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > > mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:56) > at > > mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:50) > at > > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:50) > at > > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:36) > at > > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.fire(WindowOperator.java:543) > at > > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:508) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:796) > ... 7 more > Caused by: java.io.IOException: search for error > at > > mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:48) > at > > mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:23) > at > > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373) > ... 18 more > 2016-11-15 17:44:20,037 INFO org.apache.flink.runtime.taskmanager.Task > - TriggerWindow(TumblingProcessingTimeWindows(10000), > > ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@72b9f6df > }, > ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) -> > Sink: Unnamed (1/2) switched to FAILED with exception. > TimerException{java.lang.RuntimeException: Could not forward element to > next > operator} > at > > org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:802) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358) > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) > at > > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > > mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:56) > at > > mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:50) > at > > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:50) > at > > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:36) > at > > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.fire(WindowOperator.java:543) > at > > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:508) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:796) > ... 7 more > Caused by: java.io.IOException: search for error > at > > mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:48) > at > > mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:23) > at > > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373) > ... 18 more > > > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-handling-tp3448p10168.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. > --94eb2c18a43caf1b250541d28411 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hmm, I still don't know what could be causing this. Wh= ich version of Flink are you using? Also, when you say
"BUT when t= he error is thrown inside invoke implementation from
RichSinkFunction, the recovery is not done and also no further items areprocessed even if the kafka consumer is working fine.= "

you mean that the job will simply stop and= not try restarting, right? Have you set anything as the restarting strateg= y?

Cheers,
Aljoscha

On Thu, 17 Nov 2016 at 09:48 criss &= lt;ctinmota@gmail.com> wrote:<= br>
Hi,

Here is the code which triggers the error(part of sink):
@Override
public void invoke(KafkaLog value) throws Exception {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 ......................
=C2=A0 =C2=A0 =C2=A0 =C2=A0 if (arg instanceof String && "erro= r".equals((String)arg)) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 throw new IOExcepti= on("search for error");
=C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 ...........................
}

And here's the entire stack trace that I have from log file:

2016-11-15 17:44:20,000 ERROR
org.apache.flink.streaming.runtime.tasks.StreamTask=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0- Caught
exception while processing timer.
java.lang.RuntimeException: Could not forward element to next operator
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutpu= t.collect(OperatorChain.java:376)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutpu= t.collect(OperatorChain.java:358)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOut= put.collect(AbstractStreamOperator.java:346)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOut= put.collect(AbstractStreamOperator.java:329)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(Times= tampedCollector.java:51)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTr= iggerWindowApp.java:56)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTr= iggerWindowApp.java:50)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIt= erableWindowFunction.apply(InternalIterableWindowFunction.java:50)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIt= erableWindowFunction.apply(InternalIterableWindowFunction.java:36)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.fire(= WindowOperator.java:543)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigg= er(WindowOperator.java:508)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamT= ask.java:796)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.util.concurrent.Executors$RunnableAdapt= er.call(Executors.java:511)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.util.concurrent.FutureTask.run(FutureTa= sk.java:266)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access= $201(ScheduledThreadPoolExecutor.java:180)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Sc= heduledThreadPoolExecutor.java:293)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1= 142)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:= 617)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: search for error
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlS= inkV4.java:48)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlS= inkV4.java:23)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSi= nk.java:39)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutpu= t.collect(OperatorChain.java:373)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 ... 18 more
2016-11-15 17:44:20,036 INFO
org.apache.flink.streaming.runtime.tasks.StreamTask=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0- Timer
service is shutting down.
2016-11-15 17:44:20,036 ERROR org.apache.flink.runtime.taskmanager.Task
- Task execution failed.
TimerException{java.lang.RuntimeException: Could not forward element to nex= t
operator}
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamT= ask.java:802)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.util.concurrent.Executors$RunnableAdapt= er.call(Executors.java:511)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.util.concurrent.FutureTask.run(FutureTa= sk.java:266)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access= $201(ScheduledThreadPoolExecutor.java:180)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Sc= heduledThreadPoolExecutor.java:293)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1= 142)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:= 617)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Could not forward element to next operator
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutpu= t.collect(OperatorChain.java:376)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutpu= t.collect(OperatorChain.java:358)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOut= put.collect(AbstractStreamOperator.java:346)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOut= put.collect(AbstractStreamOperator.java:329)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(Times= tampedCollector.java:51)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTr= iggerWindowApp.java:56)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTr= iggerWindowApp.java:50)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIt= erableWindowFunction.apply(InternalIterableWindowFunction.java:50)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIt= erableWindowFunction.apply(InternalIterableWindowFunction.java:36)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.fire(= WindowOperator.java:543)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigg= er(WindowOperator.java:508)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamT= ask.java:796)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 ... 7 more
Caused by: java.io.IOException: search for error
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlS= inkV4.java:48)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlS= inkV4.java:23)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSi= nk.java:39)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutpu= t.collect(OperatorChain.java:373)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 ... 18 more
2016-11-15 17:44:20,037 INFO=C2=A0 org.apache.flink.runtime.taskmanager.Tas= k
- TriggerWindow(TumblingProcessingTimeWindows(10000),
ListStateDescriptor{serializer=3Dorg.apache.flink.api.java.typeutils.runtim= e.TupleSerializer@72b9f6df},
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) -&g= t;
Sink: Unnamed (1/2) switched to FAILED with exception.
TimerException{java.lang.RuntimeException: Could not forward element to nex= t
operator}
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamT= ask.java:802)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.util.concurrent.Executors$RunnableAdapt= er.call(Executors.java:511)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.util.concurrent.FutureTask.run(FutureTa= sk.java:266)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access= $201(ScheduledThreadPoolExecutor.java:180)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Sc= heduledThreadPoolExecutor.java:293)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1= 142)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:= 617)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Could not forward element to next operator
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutpu= t.collect(OperatorChain.java:376)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutpu= t.collect(OperatorChain.java:358)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOut= put.collect(AbstractStreamOperator.java:346)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOut= put.collect(AbstractStreamOperator.java:329)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(Times= tampedCollector.java:51)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTr= iggerWindowApp.java:56)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTr= iggerWindowApp.java:50)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIt= erableWindowFunction.apply(InternalIterableWindowFunction.java:50)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIt= erableWindowFunction.apply(InternalIterableWindowFunction.java:36)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.fire(= WindowOperator.java:543)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigg= er(WindowOperator.java:508)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamT= ask.java:796)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 ... 7 more
Caused by: java.io.IOException: search for error
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlS= inkV4.java:48)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlS= inkV4.java:23)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSi= nk.java:39)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutpu= t.collect(OperatorChain.java:373)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 ... 18 more





--
View this message in context: http://apache-flink-user-= mailing-list-archive.2336050.n4.nabble.com/Error-handling-tp3448p10168.html=
Sent from the Apache Flink User Mailing List archive. mailing list archive = at Nabble.com.
--94eb2c18a43caf1b250541d28411--