flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kostas Kloudas <k.klou...@data-artisans.com>
Subject Re: Data Loss in HDFS after Job failure
Date Tue, 15 Nov 2016 21:40:51 GMT
Hi Dominique,

I totally agree with you on opening a JIRA.
I would suggest to add these “collision” checks also
for other types like in-progress and pending for example.

Thanks for reporting it,
Kostas

> On Nov 15, 2016, at 10:34 PM, Dominique Rondé <dominique.ronde@allsecur.de> wrote:
> 
> Thanks Kostas for this fast and helpful response! I was able to reproduce it and changing
the prefix and suffix really solve my problem.
> 
> I like to suggest to check both values and write a warning into the logs. If there is
no doubt, i like to open a jira and add this message.
> 
> Greets
> Dominique 
> 
> 
> 
> Von meinem Samsung Gerät gesendet.
> 
> 
> -------- Ursprüngliche Nachricht --------
> Von: Kostas Kloudas <k.kloudas@data-artisans.com> 
> Datum: 15.11.16 15:51 (GMT+01:00) 
> An: user@flink.apache.org 
> Betreff: Re: Data Loss in HDFS after Job failure 
> 
> Hello Dominique,
> 
> I think the problem is that you set both pending prefix and suffix to “”.
> Doing this makes the “committed” or “finished” filepaths indistinguishable from
the pending ones.
> Thus they are cleaned up upon restoring.
> 
> Could you undo this, and put for example a suffix “pending” or sth like this
> and let us know if this works?
> 
> Thanks,
> Kostas
> 
> > On Nov 15, 2016, at 2:57 PM, Dominique Rondé <dominique.ronde@allsecur.de>
wrote:
> > 
> > Hi @all!
> > 
> > I figured out a strange behavior with the Rolling HDFS-Sink. We consume
> > events from a kafka topic and write them into a HDFS Filesystem. We use
> > the RollingSink-Implementation in this way:
> > 
> >    RollingSink<String> sink = new
> > RollingSink<String>("/some/hdfs/directory") //
> >            .setBucketer(new DateTimeBucketer(YYYY_MM_DD)) //
> >            .disableCleanupOnOpen() //
> >            .setBatchSize(10485760L) //
> >            .setPartPrefix("part") //
> >            .setPendingPrefix("") //
> >            .setPendingSuffix("");
> > 
> > The last days we had some network trouble causing one or more
> > TaskManager out of service for some time. Due to that reason, some flink
> > jobs are canceled because there were not enought slots available. After
> > the TaskManager come back, the jobs were restarted. After that, all (!!)
> > HDFS-Directories are absolute clean. This means that no data file is
> > left under the root directory /some/hdfs/directory matching our path and
> > file name pattern. The stacktrace below is generated and shows, that the
> > job tries to recover from the last state and expect a data file existing.
> > 
> > java.lang.Exception: Could not restore checkpointed state to operators
> > and functions
> >    at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:552)
> >    at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:250)
> >    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> >    at java.lang.Thread.run(Thread.java:745)
> > Caused by: java.lang.Exception: Failed to restore state to function:
> > Error while restoring RollingSink state.
> >    at
> > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
> >    at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:544)
> >    ... 3 more
> > Caused by: java.lang.RuntimeException: Error while restoring RollingSink
> > state.
> >    at
> > org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:680)
> >    at
> > org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:123)
> >    at
> > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
> >    ... 4 more
> > Caused by: java.io.FileNotFoundException: File does not exist:
> > /some/hdfs/directory/2016-11-07/part-0-0
> >    at
> > org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
> >    at
> > org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
> >    at
> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLease(FSNamesystem.java:2877)
> >    at
> > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.recoverLease(NameNodeRpcServer.java:753)
> >    at
> > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.recoverLease(ClientNamenodeProtocolServerSideTranslatorPB.java:671)
> >    at
> > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> >    at
> > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
> >    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
> >    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2206)
> >    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2202)
> >    at java.security.AccessController.doPrivileged(Native Method)
> >    at javax.security.auth.Subject.doAs(Subject.java:422)
> >    at
> > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1709)
> >    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2200)
> > 
> >    at sun.reflect.GeneratedConstructorAccessor133.newInstance(Unknown
> > Source)
> >    at
> > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> >    at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
> >    at
> > org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
> >    at
> > org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
> >    at org.apache.hadoop.hdfs.DFSClient.recoverLease(DFSClient.java:1247)
> >    at
> > org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:279)
> >    at
> > org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:275)
> >    at
> > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> >    at
> > org.apache.hadoop.hdfs.DistributedFileSystem.recoverLease(DistributedFileSystem.java:291)
> >    at
> > org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:625)
> >    ... 6 more
> > Caused by:
> > org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException):
> > File does not exist: /user/flink/kraft/kraft-vorschlag/2016-11-07/part-0-0
> >    at
> > org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
> >    at
> > org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
> >    at
> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLease(FSNamesystem.java:2877)
> >    at
> > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.recoverLease(NameNodeRpcServer.java:753)
> >    at
> > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.recoverLease(ClientNamenodeProtocolServerSideTranslatorPB.java:671)
> >    at
> > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> >    at
> > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
> >    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
> >    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2206)
> >    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2202)
> >    at java.security.AccessController.doPrivileged(Native Method)
> >    at javax.security.auth.Subject.doAs(Subject.java:422)
> >    at
> > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1709)
> >    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2200)
> > 
> >    at org.apache.hadoop.ipc.Client.call(Client.java:1475)
> >    at org.apache.hadoop.ipc.Client.call(Client.java:1412)
> >    at
> > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
> >    at com.sun.proxy.$Proxy13.recoverLease(Unknown Source)
> >    at
> > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.recoverLease(ClientNamenodeProtocolTranslatorPB.java:603)
> >    at sun.reflect.GeneratedMethodAccessor59.invoke(Unknown Source)
> >    at
> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >    at java.lang.reflect.Method.invoke(Method.java:497)
> >    at
> > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
> >    at
> > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> >    at com.sun.proxy.$Proxy14.recoverLease(Unknown Source)
> >    at org.apache.hadoop.hdfs.DFSClient.recoverLease(DFSClient.java:1245)
> >    ... 11 more
> > 
> > Has anyone out there similar experience or any clue how to stop
> > flink/yarn/hdfs doing this?
> > 
> > Greets
> > 
> > Dominique
> > 
> > <0x962E5CF3.asc>
> 


Mime
View raw message