flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wangsan <wamg...@163.com>
Subject Re:Re: Exception in BucketingSink when cancelling Flink job
Date Fri, 29 Sep 2017 01:45:26 GMT

'Join' method can be call with a timeout (as is called in TaskCanceler), so it won't be block
forever if  the respective thread is in deadlock state. Maybe calling 'interrupt()'  after
'join(timeout)' is more reasonable, altought it still can not make sure operations inside
'close()' method is finished.


在2017年09月29 01时52分, "Stephan Ewen"<stephan@data-artisans.com>写道:


Calling 'interrupt()' makes only sense before 'join()', because 'join()' blocks until the
respective thread is finished.
The 'interrupt()' call happens to cancel the task out of potentially blocking I/O or sleep/wait

The problem is that HDFS does not handle interrupts correctly, it sometimes deadlocks in the
case of interrupts on unclosed streams :-(

I think it would be important to make sure (in the Bucketing Sink) that the DFS streams are
closed upon task cancellation.
@aljoscha - adding you to this thread, as you know most about the bucketing sink.


On Wed, Sep 27, 2017 at 10:18 AM, Stefan Richter <s.richter@data-artisans.com> wrote:


I would speculate that the reason for this order is that we want to shutdown the tasks quickly
by interrupting blocking calls in the event of failure, so that recover can begin as fast
as possible. I am looping in Stephan who might give more details about this code.


Am 27.09.2017 um 07:33 schrieb wangsan <wamgsam@163.com>:

After digging into the source code, we found that when Flink job is canceled, a TaskCanceler
thread is created.

The TaskCanceler thread calls cancel() on the invokable and periodically interrupts the
task thread until it has terminated.

try {
} catch (Throwable t) {
  logger.error("Error while canceling the task {}.", taskName, t);
}//......executer.interrupt();try {
}catch (InterruptedException e) {  // we can ignore this}//......

Notice that TaskCanceler first send interrupt signal to task thread, and following with join
method. And since the task thread is now try to close DFSOutputStream, which is waiting for
ack, thus InterruptedException is throwed out in task thread.

synchronized (dataQueue) {while (!streamerClosed) {
  checkClosed();  if (lastAckedSeqno >= seqno) {    break;
  }  try {
    dataQueue.wait(1000); // when we receive an ack, we notify on
    // dataQueue
  } catch (InterruptedException ie) {    thrownewInterruptedIOException(        "Interrupted
while waiting for data to be acknowledged by pipeline");

I was confused why TaskCanceler call executer.interrupt() before executer.join(interruptInterval).
Can anyone help?


We are currently using BucketingSink to save data into HDFS in parquet format. But when the
flink job was cancelled, we always got Exception in BucketingSink's  close method. The datailed
exception info is as below:
[ERROR] [2017-09-26 20:51:58,893] [org.apache.flink.streaming.runtime.tasks.StreamTask] -
Error during disposal of stream operator.
java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline
at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151)
at org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2130)
at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2266)
at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2236)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:643)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117)
at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:301)

at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:126)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:429)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:334)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)

It seems that DFSOutputStream haven't been closed before task thread is force terminated.
We found a similar problem in http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-timeout-for-cancel-command-td12601.html
, but setting "akka.ask.timeout" to a larger value does not work for us. So how can we make
sure the stream is safely closed when cacelling a job?


View raw message