Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BFF69200BC0 for ; Tue, 15 Nov 2016 15:51:21 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id BE945160AD8; Tue, 15 Nov 2016 14:51:21 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E02C4160B02 for ; Tue, 15 Nov 2016 15:51:20 +0100 (CET) Received: (qmail 48475 invoked by uid 500); 15 Nov 2016 14:51:20 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 48465 invoked by uid 99); 15 Nov 2016 14:51:19 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Nov 2016 14:51:19 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 75E2F1A9373 for ; Tue, 15 Nov 2016 14:51:19 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.481 X-Spam-Level: X-Spam-Status: No, score=0.481 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=data-artisans-com.20150623.gappssmtp.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id UpJD7tw1mzUg for ; Tue, 15 Nov 2016 14:51:16 +0000 (UTC) Received: from mail-wm0-f51.google.com (mail-wm0-f51.google.com [74.125.82.51]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 625B45F36D for ; Tue, 15 Nov 2016 14:51:16 +0000 (UTC) Received: by mail-wm0-f51.google.com with SMTP id t79so3764394wmt.0 for ; Tue, 15 Nov 2016 06:51:16 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=data-artisans-com.20150623.gappssmtp.com; s=20150623; h=from:content-transfer-encoding:mime-version:subject:date:references :to:in-reply-to:message-id; bh=FYWAZBP/TFUgmN09f8dJx9I48UqDJJ/XnkY0p7E5ZVE=; b=lby081go4KOcoN7x0OHDtxQB7WMtWicLwhe4/MHxVesKZtPjzuEFEL2VPuVlJWmcJ0 sN3qkRP3/FlHBq3VmuN0ek9iBY4k+NAvrMHNgt2QY2t0ecf6kvu/regw+H7TDkNleywm iYLX56ZqpOXVpwQP5czY0NIP7tknD/yTRoGcUXTDJ2Dqij8rkb3iOqgKITnGg8INLjhj omzB/LfJC8CcSpNZgn69vUXkTE0P+2IIuw19zf5gYvOYGCnQ8QNDbKUhyVFeY3TdcgLP sg0P7sGLqWdkAkmrIfiw+8CDxcrwjVBukwENXQO4e0jrB6SzR5Ynytc2Tzm9lWW0VIdY yNMA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:from:content-transfer-encoding:mime-version :subject:date:references:to:in-reply-to:message-id; bh=FYWAZBP/TFUgmN09f8dJx9I48UqDJJ/XnkY0p7E5ZVE=; b=Z/sByesFfpBb61j3u5GM7gAt3y4yGLqKGuRyf53EAphd1N3GugZI1fAWRYEscYhrBo AO+RFChfppSkchEBWu93RZeR6vJLHTEvmG0huFb/BJgl0NzJyb/qEMdWxXEXCiWcQ0vq aRaPv9mn4COoWVcpdujSZcu7Pem0jYXJDyvIPTSn43yt3iveEi+2Lw2vOQsmRJQ16sJI TUH4HEBjBDm1j7QRB+BGsdg4gOBbaTuiUx5TbShBu2Vx22wUh2egE8hmidlXk/DrnHHY XJXLvj4wbmtMC0R2Im0o424dHUlVS4piN4PhMrh5gA9/afGHH3anakXr47DToadfAEo1 n2fw== X-Gm-Message-State: ABUngve9oj2dp1Lt/MV2ow531/h5n73IflQzwbaatEe76B3ELE7RXS0hW8SEivA3tLhv97jX X-Received: by 10.28.170.134 with SMTP id t128mr4063013wme.29.1479221475490; Tue, 15 Nov 2016 06:51:15 -0800 (PST) Received: from ?IPv6:2a02:8109:8080:f84:4cbe:2396:6a6d:5aa7? ([2a02:8109:8080:f84:4cbe:2396:6a6d:5aa7]) by smtp.gmail.com with ESMTPSA id i2sm31244670wjx.44.2016.11.15.06.51.14 for (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Tue, 15 Nov 2016 06:51:14 -0800 (PST) From: Kostas Kloudas Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable Mime-Version: 1.0 (Mac OS X Mail 10.1 \(3251\)) Subject: Re: Data Loss in HDFS after Job failure Date: Tue, 15 Nov 2016 15:51:12 +0100 References: To: user@flink.apache.org In-Reply-To: Message-Id: <81B6B0E2-7EBA-4471-8562-1678F78FFF5F@data-artisans.com> X-Mailer: Apple Mail (2.3251) archived-at: Tue, 15 Nov 2016 14:51:21 -0000 Hello Dominique, I think the problem is that you set both pending prefix and suffix to = =E2=80=9C=E2=80=9D=10=10. Doing this makes the =E2=80=9Ccommitted=E2=80=9D or =E2=80=9Cfinished=E2=80= =9D filepaths indistinguishable from the pending ones. Thus they are cleaned up upon restoring. Could you undo this, and put for example a suffix =E2=80=9Cpending=E2=80=9D= or sth like this and let us know if this works? Thanks, Kostas > On Nov 15, 2016, at 2:57 PM, Dominique Rond=C3=A9 = wrote: >=20 > Hi @all! >=20 > I figured out a strange behavior with the Rolling HDFS-Sink. We = consume > events from a kafka topic and write them into a HDFS Filesystem. We = use > the RollingSink-Implementation in this way: >=20 > RollingSink sink =3D new > RollingSink("/some/hdfs/directory") // > .setBucketer(new DateTimeBucketer(YYYY_MM_DD)) // > .disableCleanupOnOpen() // > .setBatchSize(10485760L) // > .setPartPrefix("part") // > .setPendingPrefix("") // > .setPendingSuffix(""); >=20 > The last days we had some network trouble causing one or more > TaskManager out of service for some time. Due to that reason, some = flink > jobs are canceled because there were not enought slots available. = After > the TaskManager come back, the jobs were restarted. After that, all = (!!) > HDFS-Directories are absolute clean. This means that no data file is > left under the root directory /some/hdfs/directory matching our path = and > file name pattern. The stacktrace below is generated and shows, that = the > job tries to recover from the last state and expect a data file = existing. >=20 > java.lang.Exception: Could not restore checkpointed state to operators > and functions > at > = org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTas= k.java:552) > at > = org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java= :250) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Failed to restore state to function: > Error while restoring RollingSink state. > at > = org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restore= State(AbstractUdfStreamOperator.java:168) > at > = org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTas= k.java:544) > ... 3 more > Caused by: java.lang.RuntimeException: Error while restoring = RollingSink > state. > at > = org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingS= ink.java:680) > at > = org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingS= ink.java:123) > at > = org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restore= State(AbstractUdfStreamOperator.java:165) > ... 4 more > Caused by: java.io.FileNotFoundException: File does not exist: > /some/hdfs/directory/2016-11-07/part-0-0 > at > = org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71= ) > at > = org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61= ) > at > = org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLease(FSNamesys= tem.java:2877) > at > = org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.recoverLease(Name= NodeRpcServer.java:753) > at > = org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslat= orPB.recoverLease(ClientNamenodeProtocolServerSideTranslatorPB.java:671) > at > = org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientN= amenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > = org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(Pro= tobufRpcEngine.java:616) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2206) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2202) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > = org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.= java:1709) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2200) >=20 > at sun.reflect.GeneratedConstructorAccessor133.newInstance(Unknown > Source) > at > = sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstr= uctorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:422) > at > = org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException= .java:106) > at > = org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteExceptio= n.java:73) > at = org.apache.hadoop.hdfs.DFSClient.recoverLease(DFSClient.java:1247) > at > = org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSyste= m.java:279) > at > = org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSyste= m.java:275) > at > = org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver= .java:81) > at > = org.apache.hadoop.hdfs.DistributedFileSystem.recoverLease(DistributedFileS= ystem.java:291) > at > = org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingS= ink.java:625) > ... 6 more > Caused by: > org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): > File does not exist: = /user/flink/kraft/kraft-vorschlag/2016-11-07/part-0-0 > at > = org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71= ) > at > = org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61= ) > at > = org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLease(FSNamesys= tem.java:2877) > at > = org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.recoverLease(Name= NodeRpcServer.java:753) > at > = org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslat= orPB.recoverLease(ClientNamenodeProtocolServerSideTranslatorPB.java:671) > at > = org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientN= amenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > = org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(Pro= tobufRpcEngine.java:616) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2206) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2202) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > = org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.= java:1709) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2200) >=20 > at org.apache.hadoop.ipc.Client.call(Client.java:1475) > at org.apache.hadoop.ipc.Client.call(Client.java:1412) > at > = org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.j= ava:229) > at com.sun.proxy.$Proxy13.recoverLease(Unknown Source) > at > = org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.recov= erLease(ClientNamenodeProtocolTranslatorPB.java:603) > at sun.reflect.GeneratedMethodAccessor59.invoke(Unknown Source) > at > = sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorIm= pl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > = org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvoca= tionHandler.java:191) > at > = org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHa= ndler.java:102) > at com.sun.proxy.$Proxy14.recoverLease(Unknown Source) > at = org.apache.hadoop.hdfs.DFSClient.recoverLease(DFSClient.java:1245) > ... 11 more >=20 > Has anyone out there similar experience or any clue how to stop > flink/yarn/hdfs doing this? >=20 > Greets >=20 > Dominique >=20 > <0x962E5CF3.asc>