flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Maximilian Michels (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-1793) Streaming File Source cannot be canceled
Date Wed, 08 Apr 2015 15:30:16 GMT

    [ https://issues.apache.org/jira/browse/FLINK-1793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14485390#comment-14485390
] 

Maximilian Michels commented on FLINK-1793:
-------------------------------------------

Some more debug information. Upon cancellation on all TaskManagers:

{noformat}
4:14:17,864 INFO  org.apache.flink.runtime.taskmanager.Task                     - File Source
-> Flat Map (5/20) switched to CANCELING
14:14:17,867 INFO  org.apache.flink.runtime.execution.RuntimeEnvironment         - Canceling
File Source -> Flat Map (5/20) (6d97278384fa2cf2c4b7d2851844beea).
14:14:17,878 ERROR org.apache.flink.streaming.api.collector.StreamOutput         - Emit failed
due to: java.lang.InterruptedException
        at java.lang.Object.wait(Native Method)
        at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
        at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:91)
        at org.apache.flink.streaming.api.collector.StreamOutput.collect(StreamOutput.java:66)
        at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:39)
        at org.apache.flink.streaming.examples.wordcount.WordCount$Tokenizer.flatMap(WordCount.java:105)
        at org.apache.flink.streaming.examples.wordcount.WordCount$Tokenizer.flatMap(WordCount.java:93)
        at org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable.callUserFunction(FlatMapInvokable.java:42)
        at org.apache.flink.streaming.api.invokable.StreamInvokable.callUserFunctionAndLogException(StreamInvokable.java:139)
        at org.apache.flink.streaming.api.invokable.ChainableInvokable.collect(ChainableInvokable.java:54)
        at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:39)
        at org.apache.flink.streaming.api.function.source.FileSourceFunction.run(FileSourceFunction.java:72)
        at org.apache.flink.streaming.api.invokable.SourceInvokable.callUserFunction(SourceInvokable.java:42)
        at org.apache.flink.streaming.api.invokable.StreamInvokable.callUserFunctionAndLogException(StreamInvokable.java:139)
        at org.apache.flink.streaming.api.invokable.SourceInvokable.invoke(SourceInvokable.java:37)
        at org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:170)
        at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
        at java.lang.Thread.run(Thread.java:745)

14:14:17,866 INFO  org.apache.flink.runtime.taskmanager.Task                     - File Source
-> Flat Map (15/20) switched to CANCELING
14:14:17,870 INFO  org.apache.flink.runtime.taskmanager.Task                     - StreamDiscretizer
-> SlidingCountGroupedPreReducer -> Window Flatten -> Stream Sink (15/20) switched
to CANCELING
14:14:17,890 INFO  org.apache.flink.runtime.execution.RuntimeEnvironment         - Canceling
File Source -> Flat Map (15/20) (4ce2b555b5032ddd6404568e93147a51).
14:14:17,882 INFO  org.apache.flink.runtime.taskmanager.Task                     - StreamDiscretizer
-> SlidingCountGroupedPreReducer -> Window Flatten -> Stream Sink (5/20) switched
to CANCELING
14:14:17,925 INFO  org.apache.flink.runtime.execution.RuntimeEnvironment         - Canceling
StreamDiscretizer -> SlidingCountGroupedPreReducer -> Window Flatten -> Stream Sink
(5/20) (ba96dfac87ccb3669397425562f5d54d).
14:14:17,915 ERROR org.apache.flink.streaming.api.collector.StreamOutput         - Emit failed
due to: java.lang.InterruptedException
        at java.lang.Object.wait(Native Method)
        at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
        at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:91)
        at org.apache.flink.streaming.api.collector.StreamOutput.collect(StreamOutput.java:66)
        at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:39)
        at org.apache.flink.streaming.examples.wordcount.WordCount$Tokenizer.flatMap(WordCount.java:105)
        at org.apache.flink.streaming.examples.wordcount.WordCount$Tokenizer.flatMap(WordCount.java:93)
        at org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable.callUserFunction(FlatMapInvokable.java:42)
        at org.apache.flink.streaming.api.invokable.StreamInvokable.callUserFunctionAndLogException(StreamInvokable.java:139)
        at org.apache.flink.streaming.api.invokable.ChainableInvokable.collect(ChainableInvokable.java:54)
        at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:39)
        at org.apache.flink.streaming.api.function.source.FileSourceFunction.run(FileSourceFunction.java:72)
        at org.apache.flink.streaming.api.invokable.SourceInvokable.callUserFunction(SourceInvokable.java:42)
        at org.apache.flink.streaming.api.invokable.StreamInvokable.callUserFunctionAndLogException(StreamInvokable.java:139)
        at org.apache.flink.streaming.api.invokable.SourceInvokable.invoke(SourceInvokable.java:37)
        at org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:170)
        at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
        at java.lang.Thread.run(Thread.java:745)

14:14:17,866 INFO  org.apache.flink.runtime.taskmanager.Task                     - File Source
-> Flat Map (15/20) switched to CANCELING
14:14:17,870 INFO  org.apache.flink.runtime.taskmanager.Task                     - StreamDiscretizer
-> SlidingCountGroupedPreReducer -> Window Flatten -> Stream Sink (15/20) switched
to CANCELING
14:14:17,890 INFO  org.apache.flink.runtime.execution.RuntimeEnvironment         - Canceling
File Source -> Flat Map (15/20) (4ce2b555b5032ddd6404568e93147a51).
14:14:17,882 INFO  org.apache.flink.runtime.taskmanager.Task                     - StreamDiscretizer
-> SlidingCountGroupedPreReducer -> Window Flatten -> Stream Sink (5/20) switched
to CANCELING
14:14:17,925 INFO  org.apache.flink.runtime.execution.RuntimeEnvironment         - Canceling
StreamDiscretizer -> SlidingCountGroupedPreReducer -> Window Flatten -> Stream Sink
(5/20) (ba96dfac87ccb3669397425562f5d54d).
14:14:17,915 ERROR org.apache.flink.streaming.api.collector.StreamOutput         - Emit failed
due to: java.lang.InterruptedException
        at java.lang.Object.wait(Native Method)
        at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
        at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:91)
        at org.apache.flink.streaming.api.collector.StreamOutput.collect(StreamOutput.java:66)
        at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:39)
        at org.apache.flink.streaming.examples.wordcount.WordCount$Tokenizer.flatMap(WordCount.java:105)
        at org.apache.flink.streaming.examples.wordcount.WordCount$Tokenizer.flatMap(WordCount.java:93)
        at org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable.callUserFunction(FlatMapInvokable.java:42)
        at org.apache.flink.streaming.api.invokable.StreamInvokable.callUserFunctionAndLogException(StreamInvokable.java:139)
        at org.apache.flink.streaming.api.invokable.ChainableInvokable.collect(ChainableInvokable.java:54)
        at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:39)
        at org.apache.flink.streaming.api.function.source.FileSourceFunction.run(FileSourceFunction.java:72)
        at org.apache.flink.streaming.api.invokable.SourceInvokable.callUserFunction(SourceInvokable.java:42)
        at org.apache.flink.streaming.api.invokable.StreamInvokable.callUserFunctionAndLogException(StreamInvokable.java:139)
        at org.apache.flink.streaming.api.invokable.SourceInvokable.invoke(SourceInvokable.java:37)
        at org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:170)
        at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
        at java.lang.Thread.run(Thread.java:745)

14:14:17,915 INFO  org.apache.flink.runtime.execution.RuntimeEnvironment         - Canceling
StreamDiscretizer -> SlidingCountGroupedPreReducer -> Window Flatten -> Stream Sink
(15/20) (ef9e5c001e33e7e7f57436277e38a19c).
14:14:18,013 INFO  org.apache.flink.runtime.taskmanager.Task                     - StreamDiscretizer
-> SlidingCountGroupedPreReducer -> Window Flatten -> Stream Sink (15/20) switched
to CANCELED
14:14:18,020 INFO  org.apache.flink.runtime.taskmanager.Task                     - StreamDiscretizer
-> SlidingCountGroupedPreReducer -> Window Flatten -> Stream Sink (5/20) switched
to CANCELED
14:14:18,021 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Unregister
task with execution ID ef9e5c001e33e7e7f57436277e38a19c.
14:14:18,126 ERROR org.apache.flink.streaming.api.collector.StreamOutput         - Emit failed
due to: java.lang.NullPointerException
        at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.setNextBuffer(SpanningRecordSerializer.java:93)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
        at org.apache.flink.streaming.api.collector.StreamOutput.collect(StreamOutput.java:66)
        at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:39)
        at org.apache.flink.streaming.examples.wordcount.WordCount$Tokenizer.flatMap(WordCount.java:105)
        at org.apache.flink.streaming.examples.wordcount.WordCount$Tokenizer.flatMap(WordCount.java:93)
        at org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable.callUserFunction(FlatMapInvokable.java:42)
        at org.apache.flink.streaming.api.invokable.StreamInvokable.callUserFunctionAndLogException(StreamInvokable.java:139)
        at org.apache.flink.streaming.api.invokable.ChainableInvokable.collect(ChainableInvokable.java:54)
        at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:39)
        at org.apache.flink.streaming.api.function.source.FileSourceFunction.run(FileSourceFunction.java:72)
        at org.apache.flink.streaming.api.invokable.SourceInvokable.callUserFunction(SourceInvokable.java:42)
        at org.apache.flink.streaming.api.invokable.StreamInvokable.callUserFunctionAndLogException(StreamInvokable.java:139)
        at org.apache.flink.streaming.api.invokable.SourceInvokable.invoke(SourceInvokable.java:37)
        at org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:170)
        at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
        at java.lang.Thread.run(Thread.java:745)
{noformat}

After all tasks have thrown the InterruptedException, they throw NullPointerExceptions endlessly.
In this case, they managed to produce 10GB of log data within half an hour..


> Streaming File Source cannot be canceled
> ----------------------------------------
>
>                 Key: FLINK-1793
>                 URL: https://issues.apache.org/jira/browse/FLINK-1793
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 0.9
>            Reporter: Maximilian Michels
>            Assignee: Márton Balassi
>            Priority: Critical
>             Fix For: 0.9
>
>
> The streaming WordCount gets stuck in the "CANCELED" state after it has been canceled
using either the web interface or the command line interface.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message