flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jürgen Thomann <juergen.thom...@innogames.com>
Subject Re: 回复:Changing timeout for cancel command
Date Thu, 13 Apr 2017 07:31:48 GMT
Hi zhijiang,

I checked this value and I haven't configured it so I think it should be 
the default 10s. I checked how long the flink cancel command took with 
the time command and it was finished after 6 seconds.

After filtering out the messages of one Sink, it looks like it 
interrupts it in the area of milliseconds. Here are the logs from one 
taskmanager (standalone cluster).

06:53:16,484 INFO 
org.apache.flink.runtime.taskmanager.Task                     - 
Attempting to cancel task Sink: /tmp/flink/events_invalid HDFS sink 
(2/2) (477db6e41932ad9b60c72e14de4488ed).
06:53:16,484 INFO 
org.apache.flink.runtime.taskmanager.Task                     - Sink: 
/tmp/flink/events_invalid HDFS sink (2/2) 
(477db6e41932ad9b60c72e14de4488ed) switched from RUNNING to CANCELING.
06:53:16,484 INFO 
org.apache.flink.runtime.taskmanager.Task                     - 
Triggering cancellation of task code Sink: /tmp/flink/events_invalid 
HDFS sink (2/2) (477db6e41932ad9b60c72e14de4488ed).
06:53:16,503 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask           - Error 
during disposal of stream operator.
06:53:16,503 ERROR 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink - Error 
while trying to hflushOrSync! java.io.InterruptedIOException: 
Interrupted while waiting for data to be acknowledged by pipeline
java.io.InterruptedIOException: Interrupted while waiting for data to be 
acknowledged by pipeline
         at 
org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151)
         at 
org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:2038)
         at 
org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1946)
         at 
org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
         at sun.reflect.GeneratedMethodAccessor156.invoke(Unknown Source)
         at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
         at java.lang.reflect.Method.invoke(Method.java:498)
         at 
org.apache.flink.streaming.connectors.fs.StreamWriterBase.hflushOrSync(StreamWriterBase.java:72)
         at 
org.apache.flink.streaming.connectors.fs.StreamWriterBase.flush(StreamWriterBase.java:131)
         at 
org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:146)
         at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:554)
         at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.close(BucketingSink.java:423)
         at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
         at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
         at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
         at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
         at java.lang.Thread.run(Thread.java:745)
06:53:16,504 INFO 
org.apache.flink.core.fs.FileSystem                           - Ensuring 
all FileSystem streams are closed for Sink: /tmp/flink/events_invalid 
HDFS sink (2/2)
06:53:16,504 INFO 
org.apache.flink.runtime.taskmanager.TaskManager              - 
Un-registering task and sending final execution state CANCELED to 
JobManager for task Sink: /tmp/flink/events_invalid HDFS sink 
(477db6e41932ad9b60c72e14de4488ed)

Best,
Jürgen

On 13.04.2017 07:05, Zhijiang(wangzhijiang999) wrote:
> Hi Jürgen,
>
>      You can set the timeout in the configuration by this key 
> "akka.ask.timeout", and the current default value is 10 s. Hope it can 
> help you.
>
>
> cheers,
> zhijiang
>
>
>     ------------------------------------------------------------------
>     发件人:Jürgen Thomann <juergen.thomann@innogames.com>
>     发送时间:2017年4月12日(星期三) 19:04
>     收件人:user <user@flink.apache.org>
>     主 题:Changing timeout for cancel command
>
>     Hi,
>
>     We currently get the following exception if we cancel a job which writes
>
>     to Hadoop:
>     ERROR org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink
>
>     - Error while trying to hflushOrSync! java.io.InterruptedIOException:
>     Interrupted while waiting for data to be acknowledged by pipeline
>
>     This causes problem if we cancel a job with creating a savepoint and
>     resubmitting the job because the file is sometimes at the end smaller
>     than the file size specified in the valid-length file.
>
>     Is there a way to increase the time out during cancel to give the flush
>
>     a bit more time? We currently lose events if this happens.
>
>     Best,
>     Jürgen
>
>

Mime
View raw message