flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kostas Kloudas (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4725) BucketingSink throws NPE while restoring state if basePath does not exist
Date Mon, 21 Nov 2016 14:30:58 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683703#comment-15683703

Kostas Kloudas commented on FLINK-4725:

I think this issue is resolved already in the BucketingSink and it will be in the RollingSink
with an upcoming Pull Request.
Now both of these sinks will not cleanup upon restoring from a failure. 

You can also check the discussion here: https://issues.apache.org/jira/browse/FLINK-5083 and
in the PR referenced in that JIRA.

> BucketingSink throws NPE while restoring state if basePath does not exist
> -------------------------------------------------------------------------
>                 Key: FLINK-4725
>                 URL: https://issues.apache.org/jira/browse/FLINK-4725
>             Project: Flink
>          Issue Type: Bug
>          Components: filesystem-connector
>    Affects Versions: 1.2.0, 1.1.2
>            Reporter: Jordan Ganoff
>            Priority: Blocker
> BucketingSink throws a NullPointerException when attempting to clean up pending files
if the basePath does not exist.
> The culprit is a [call to org.apache.hadoop.fs.FileSystem.listFiles() on line 784|https://github.com/apache/flink/blob/master/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L784].
> I added this longer description from the duplicate FLINK-4779 (aljoscha):
> When I restore from a savepoint, starting the job fails when the root-folder used by
the BucketingSink not yet exists. This may happen in my case, when the source for my sink
has not yet emitted any messages and I did not create the folder by hand.
> The complete folder structure is not required by the BucketingSink as it will create
itermediate folders by itself when creating the bucket.
> I suggest that this does not prevent the job from being restarted.
> {code}
> 10/07/2016 22:50:53     Source: Kafka Consumer for X -> (Sink: HDFS for X, Sink: X)(1/1)
switched to FAILED
> java.lang.Exception: Failed to restore state to function: Error while deleting old pending
>         at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:184)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:550)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:255)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:609)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error while deleting old pending files.
>         at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.restoreState(BucketingSink.java:805)
>         at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.restoreState(BucketingSink.java:139)
>         at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:182)
>         ... 4 more
> Caused by: java.io.FileNotFoundException: File hdfs://server:8020/1/2/3/4/flink does
not exist.
>         at org.apache.hadoop.hdfs.DistributedFileSystem$DirListingIterator.<init>(DistributedFileSystem.java:948)
>         at org.apache.hadoop.hdfs.DistributedFileSystem$DirListingIterator.<init>(DistributedFileSystem.java:927)
>         at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:872)
>         at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:868)
>         at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>         at org.apache.hadoop.hdfs.DistributedFileSystem.listLocatedStatus(DistributedFileSystem.java:886)
>         at org.apache.hadoop.fs.FileSystem.listLocatedStatus(FileSystem.java:1696)
>         at org.apache.hadoop.fs.FileSystem$6.<init>(FileSystem.java:1791)
>         at org.apache.hadoop.fs.FileSystem.listFiles(FileSystem.java:1787)
>         at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.restoreState(BucketingSink.java:784)
>         ... 6 more
> {code}

This message was sent by Atlassian JIRA

View raw message