flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Shannon Carey (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-4803) Job Cancel can hang forever waiting for OutputFormat.close()
Date Tue, 11 Oct 2016 18:49:21 GMT
Shannon Carey created FLINK-4803:
------------------------------------

             Summary: Job Cancel can hang forever waiting for OutputFormat.close()
                 Key: FLINK-4803
                 URL: https://issues.apache.org/jira/browse/FLINK-4803
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.1.1
            Reporter: Shannon Carey


If the Flink job uses a badly-behaved OutputFormat (in this example, a HadoopOutputFormat
containing a CqlBulkOutputFormat), where the close() method blocks forever, it is impossible
to cancel the Flink job even though the blocked thread would respond to an interrupt. The
stack traces below show the state of the important threads when a job is canceled and the
OutputFormat is blocking forever inside of close().

I suggest that `DataSinkTask.cancel()` method be updated to add a timeout on `this.format.close()`.
When the timeout is reached, the Task thread should be interrupted.

{code}
   java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
        - waiting to lock <0x00000006bae5f788> (a java.lang.Object)
        at org.apache.flink.runtime.operators.DataSinkTask.cancel(DataSinkTask.java:268)
        at org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1149)
        at java.lang.Thread.run(Thread.java:745)

"DataSink (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" #6410
daemon prio=5 os_prio=0 tid=0x00007fb7e79a4800 nid=0x2ad8 waiting on condition [0x00007fb7bdf78000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000006c5ab5e20> (a java.util.concurrent.SynchronousQueue$TransferStack)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
        at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
        at java.util.concurrent.SynchronousQueue.offer(SynchronousQueue.java:895)
        at org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.put(SSTableSimpleUnsortedWriter.java:194)
        at org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.sync(SSTableSimpleUnsortedWriter.java:180)
        at org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.close(SSTableSimpleUnsortedWriter.java:156)
        at org.apache.cassandra.io.sstable.CQLSSTableWriter.close(CQLSSTableWriter.java:275)
        at org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:133)
        at org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:126)
        at org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
        - locked <0x00000006bae5f788> (a java.lang.Object)
        at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:234)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message