flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wangsan <wamg...@163.com>
Subject Question about job canceling in Flink
Date Wed, 27 Sep 2017 06:55:38 GMT
Hi all,

We are currently using BucketingSink to save data into HDFS in parquet format. But when the
flink job was canceled, we always got Exception in BucketingSink’s close method. The detailed
exception info is as below:
[ERROR] [2017-09-26 20:51:58,893] [org.apache.flink.streaming.runtime.tasks.StreamTask] -
Error during disposal of stream operator.
java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline
   at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151)
   at org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2130)
   at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2266)
   at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2236)
   at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
   at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
   at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:643)
   at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117)
   at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:301)
       …….
   at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
   at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:126)
   at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:429)
   at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:334)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
   at java.lang.Thread.run(Thread.java:745)

After digging into the source code, we found that when Flink job is canceled, a TaskCanceler
thread is created. The TaskCanceler thread calls cancel() on the invokable and periodically
interrupts the task thread until it has terminated.

try {
  invokable.cancel();
} catch (Throwable t) {
  logger.error("Error while canceling the task {}.", taskName, t);
}//......executer.interrupt();try {
  executer.join(interruptInterval);
}catch (InterruptedException e) {  // we can ignore this}//......

Notice that TaskCanceler first send interrupt signal to task thread, following with join method.
And since the task thread is now try to close DFSOutputStream, which is waiting for ack, thus
InterruptedException is throwed in task thread.

synchronized (dataQueue) {while (!streamerClosed) {
  checkClosed();  if (lastAckedSeqno >= seqno) {    break;
  }  try {
    dataQueue.wait(1000); // when we receive an ack, we notify on
    // dataQueue
  } catch (InterruptedException ie) {    thrownewInterruptedIOException(        "Interrupted
while waiting for data to be acknowledged by pipeline");
  }
}

I was so confused why TaskCanceler call executer.interrupt() before executer.join(interruptInterval).
Can anyone help?

Best,

wangsan
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message