Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DFCAA200BCA for ; Mon, 21 Nov 2016 15:31:00 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id DE9A3160B1E; Mon, 21 Nov 2016 14:31:00 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 32EDE160B1C for ; Mon, 21 Nov 2016 15:31:00 +0100 (CET) Received: (qmail 3456 invoked by uid 500); 21 Nov 2016 14:30:59 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 3341 invoked by uid 99); 21 Nov 2016 14:30:59 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Nov 2016 14:30:59 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id CDD0F2C4C72 for ; Mon, 21 Nov 2016 14:30:58 +0000 (UTC) Date: Mon, 21 Nov 2016 14:30:58 +0000 (UTC) From: "Kostas Kloudas (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-4725) BucketingSink throws NPE while restoring state if basePath does not exist MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Mon, 21 Nov 2016 14:31:01 -0000 [ https://issues.apache.org/jira/browse/FLINK-4725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683703#comment-15683703 ] Kostas Kloudas commented on FLINK-4725: --------------------------------------- I think this issue is resolved already in the BucketingSink and it will be in the RollingSink with an upcoming Pull Request. Now both of these sinks will not cleanup upon restoring from a failure. You can also check the discussion here: https://issues.apache.org/jira/browse/FLINK-5083 and in the PR referenced in that JIRA. > BucketingSink throws NPE while restoring state if basePath does not exist > ------------------------------------------------------------------------- > > Key: FLINK-4725 > URL: https://issues.apache.org/jira/browse/FLINK-4725 > Project: Flink > Issue Type: Bug > Components: filesystem-connector > Affects Versions: 1.2.0, 1.1.2 > Reporter: Jordan Ganoff > Priority: Blocker > > BucketingSink throws a NullPointerException when attempting to clean up pending files if the basePath does not exist. > The culprit is a [call to org.apache.hadoop.fs.FileSystem.listFiles() on line 784|https://github.com/apache/flink/blob/master/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L784]. > I added this longer description from the duplicate FLINK-4779 (aljoscha): > When I restore from a savepoint, starting the job fails when the root-folder used by the BucketingSink not yet exists. This may happen in my case, when the source for my sink has not yet emitted any messages and I did not create the folder by hand. > The complete folder structure is not required by the BucketingSink as it will create itermediate folders by itself when creating the bucket. > I suggest that this does not prevent the job from being restarted. > {code} > 10/07/2016 22:50:53 Source: Kafka Consumer for X -> (Sink: HDFS for X, Sink: X)(1/1) switched to FAILED > java.lang.Exception: Failed to restore state to function: Error while deleting old pending files. > at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:184) > at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:550) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:255) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:609) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Error while deleting old pending files. > at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.restoreState(BucketingSink.java:805) > at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.restoreState(BucketingSink.java:139) > at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:182) > ... 4 more > Caused by: java.io.FileNotFoundException: File hdfs://server:8020/1/2/3/4/flink does not exist. > at org.apache.hadoop.hdfs.DistributedFileSystem$DirListingIterator.(DistributedFileSystem.java:948) > at org.apache.hadoop.hdfs.DistributedFileSystem$DirListingIterator.(DistributedFileSystem.java:927) > at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:872) > at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:868) > at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at org.apache.hadoop.hdfs.DistributedFileSystem.listLocatedStatus(DistributedFileSystem.java:886) > at org.apache.hadoop.fs.FileSystem.listLocatedStatus(FileSystem.java:1696) > at org.apache.hadoop.fs.FileSystem$6.(FileSystem.java:1791) > at org.apache.hadoop.fs.FileSystem.listFiles(FileSystem.java:1787) > at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.restoreState(BucketingSink.java:784) > ... 6 more > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)