flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Biswajit Das <biswajit...@gmail.com>
Subject Re: custom writer fail to recover
Date Thu, 24 Aug 2017 20:28:06 GMT
Hi Stefan ,

My bad , I'm really sorry. I have copied wrong exception stack , during the
recovery after error I'm seeing below exception


Caused by:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.HadoopIllegalArgumentException):
Cannot truncate to a larger file size. Current size: 31132385, truncate
size: 35985787.
    at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncateInternal(FSNamesystem.java:2089)
    at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncateInt(FSNamesystem.java:2027)
    at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:1997)
    at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:930)
    at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:599)
    at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

    at org.apache.hadoop.ipc.Client.call(Client.java:1475)
    at org.apache.hadoop.ipc.Client.call(Client.java:1412)
    at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    at com.sun.proxy.$Proxy12.truncate(Unknown Source)
    at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.truncate(ClientNamenodeProtocolTranslatorPB.java:313)
    at sun.reflect.GeneratedMethodAccessor309.invoke(Unknown Source)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy13.truncate(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.truncate(DFSClient.java:2016)
    at
org.apache.hadoop.hdfs.DistributedFileSystem$13.doCall(DistributedFileSystem.java:689)
    at
org.apache.hadoop.hdfs.DistributedFileSystem$13.doCall(DistributedFileSystem.java:685)
    at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at
org.apache.hadoop.hdfs.DistributedFileSystem.truncate(DistributedFileSystem.java:696)
    ... 15 more
2017-08-24 20:22:44,005 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink:
Unnamed (9/16) (af635c8938168acfc85542c830d71002) switched from RUNNING to
FAILED.
java.lang.RuntimeException: Could not invoke truncate.
    at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:846)
    at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handleRestoredBucketState(BucketingSink.java:718)
    at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:370)
    at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:177)
    at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:159)
    at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:105)
    at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:675)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:662)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.GeneratedMethodAccessor308.invoke(Unknown Source)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:808)
    ... 11 more
Caused by:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.HadoopIllegalArgumentException):
Cannot truncate to a larger file size. Current size: 38029300, truncate
size: 44601803.
    at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncateInternal(FSNamesystem.java:2089)
    at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncateInt(FSNamesystem.java:2027)
    at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:1997)
    at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:930)
    at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:599)
    at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

    at org.apache.hadoop.ipc.Client.call(Client.java:1475)
    at org.apache.hadoop.ipc.Client.call(Client.java:1412)
    at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    at com.sun.proxy.$Proxy12.truncate(Unknown Source)
    at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.truncate(ClientNamenodeProtocolTranslatorPB.java:313)
    at sun.reflect.GeneratedMethodAccessor309.invoke(Unknown Source)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy13.truncate(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.truncate(DFSClient.java:2016)
    at
org.apache.hadoop.hdfs.DistributedFileSystem$13.doCall(DistributedFileSystem.java:689)
    at
org.apache.hadoop.hdfs.DistributedFileSystem$13.doCall(DistributedFileSystem.java:685)
    at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at
org.apache.hadoop.hdfs.DistributedFileSystem.truncate(DistributedFileSystem.java:696)
    ... 15 more


On Thu, Aug 24, 2017 at 4:25 AM, Stefan Richter <s.richter@data-artisans.com
> wrote:

> Hi,
>
> I think there are two different things mixed up in your analysis. The
> stack trace that you provided is caused by a failing checkpoint - in
> writing, not in reading. It seems to fail from a Timeout of your HDFS
> connection. This close method has also nothing to do with the close method
> in the writer. It is the close method of the CheckpointOutputStream.
> Furthermore, „could not materialize checkpoint“ seems to happen in cancel,
> so if the checkpoint got canceled that means this is an effect and not the
> cause. There should be another exception further up in the logs that gives
> the real reason why the checkpoint was canceled.
>
> Nevertheless, the timeout is strange and you should check if your DFS is
> properly configured and running as expected. The reported exception should
> have no direct connection with your ParquetWriter. It is possible that the
> checkpoint was canceled because some problem happened in the ParquetWriter,
> but then we are looking at the wrong stack trace.
>
> As for the pending files, different DFS implementations could have
> different points where flush() is called. I think your implementation also
> properly forward to writer.flush?
>
> Best,
> Stefan
>
> > Am 23.08.2017 um 21:05 schrieb Biswajit Das <biswajit.ds@gmail.com>:
> >
> > Hi There ,
> >
> > I'm using custom writer with hourly Rolling Bucket sink . I'm seeing two
> issue
> >
> > first one if write the same file on s3 all the files
> > gets committed , however when I write the same on HDFS I see its remains
> on .pending state , could be related to second problem below
> >
> > Second issue : My custom writer is writing Avro to parquet and writer is
> something like this extended from BaseStreamWriter
> >
> >
> >   @transient private var writer: ParquetWriter[T] = _
> >
> >   override def open(fs: FileSystem, path: Path): Unit = {
> >     val conf = new Configuration()
> >     conf.setBoolean(ADD_LIST_ELEMENT_RECORDS, false)
> >     conf.setBoolean(WRITE_OLD_LIST_STRUCTURE, false)
> >     writer = AvroParquetWriter
> >       .builder[T](path)
> >       .withSchema(new Schema.Parser().parse(schema))
> >       .withCompressionCodec(compressionCodecName)
> >       .withConf(conf)
> >       .build()
> >   }
> >
> >   override def write(element: T): Unit = writer.write(element)
> >
> >   override def duplicate(): Writer[T] = new AvroParquetSinkWriter[T](
> schema)
> >
> >   override def close(): Unit = writer.close()
> >
> >   override def getPos: Long = writer.getDataSize
> >
> >   override def flush(): Long = super.flush()
> >
> >
> > What I noticed during recovering from checkpoint it fails to flush ,
> although I have overriden flush ^^ above . The issue seems
> > it doesn't have handle of stream writer that's why it is failing when
> flush call for stream writer , not sure if first .pedning
> > state is related to this also .
> >
> >
> > --------------------------------------------------
> > 11:52:04.082 [pool-13-thread-1] INFO  o.a.flink.runtime.taskmanager.Task
> - Source: eo_open- kafka source (1/1) (d926613dfcb5ac993a362e9b985e40d6)
> switched from RUNNING to FAILED.
> > org.apache.flink.streaming.runtime.tasks.AsynchronousException:
> java.lang.Exception: Could not materialize checkpoint 4 for operator
> Source:- kafka source (1/1).
> >     at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRunnable.run(StreamTask.java:970)
> ~[flink-streaming-java_2.10-1.3.2.jar:1.3.2]
> >     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [na:1.8.0_73]
> >     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> [na:1.8.0_73]
> >     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [na:1.8.0_73]
> >     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [na:1.8.0_73]
> >     at java.lang.Thread.run(Thread.java:745) [na:1.8.0_73]
> > Caused by: java.lang.Exception: Could not materialize checkpoint 4 for
> operator Source: eo_open- kafka source (1/1).
> >     ... 6 common frames omitted
> > Caused by: java.util.concurrent.ExecutionException:
> java.io.IOException: Could not flush and close the file system output
> stream to hdfs://XXXX:8020/checkpoint/data/das/
> 2aecebbb9d46fb2231edb4b5bd00f6fc/chk-4/e33e1f23-c834-4c96-8708-22e648114d6c
> in order to obtain the stream state handle
> >     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> [na:1.8.0_73]
> >     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> [na:1.8.0_73]
> >     at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
> ~[flink-core-1.3.2.jar:1.3.2]
> >     at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRunnable.run(StreamTask.java:906)
> ~[flink-streaming-java_2.10-1.3.2.jar:1.3.2]
> >     ... 5 common frames omitted
> >     Suppressed: java.lang.Exception: Could not properly cancel managed
> operator state future.
> >         at org.apache.flink.streaming.api.operators.
> OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:98)
> ~[flink-streaming-java_2.10-1.3.2.jar:1.3.2]
> >         at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
> ~[flink-streaming-java_2.10-1.3.2.jar:1.3.2]
> >         at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRunnable.run(StreamTask.java:961)
> ~[flink-streaming-java_2.10-1.3.2.jar:1.3.2]
> >         ... 5 common frames omitted
> >     Caused by: java.util.concurrent.ExecutionException:
> java.io.IOException: Could not flush and close the file system output
> stream to hdfs://xxx:8020/checkpoint/data/das/
> 2aecebbb9d46fb2231edb4b5bd00f6fc/chk-4/e33e1f23-c834-4c96-8708-22e648114d6c
> in order to obtain the stream state handle
> >         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> >         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> >         at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(
> FutureUtil.java:43)
> >         at org.apache.flink.runtime.state.StateUtil.
> discardStateFuture(StateUtil.java:85)
> >         at org.apache.flink.streaming.api.operators.
> OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:96)
> >         ... 7 common frames omitted
> >     Caused by: java.io.IOException: Could not flush and close the file
> system output stream to hdfs://XXX:8020/checkpoint/data/das/
> 2aecebbb9d46fb2231edb4b5bd00f6fc/chk-4/e33e1f23-c834-4c96-8708-22e648114d6c
> in order to obtain the stream state handle
> >         at org.apache.flink.runtime.state.filesystem.
> FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(
> FsCheckpointStreamFactory.java:336)
> >         at org.apache.flink.runtime.checkpoint.
> AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(
> AbstractAsyncSnapshotIOCallable.java:100)
> >         at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.
> performOperation(DefaultOperatorStateBackend.java:270)
> >         at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.
> performOperation(DefaultOperatorStateBackend.java:233)
> >         at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.
> call(AbstractAsyncIOCallable.java:72)
> >         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >         at org.apache.flink.runtime.state.DefaultOperatorStateBackend.
> snapshot(DefaultOperatorStateBackend.java:288)
> >         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:392)
> >         at org.apache.flink.streaming.runtime.tasks.StreamTask$
> CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
> >         at org.apache.flink.streaming.runtime.tasks.StreamTask$
> CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
> >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> checkpointState(StreamTask.java:654)
> >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> performCheckpoint(StreamTask.java:590)
> >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> triggerCheckpoint(StreamTask.java:521)
> >         at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.
> triggerCheckpoint(SourceStreamTask.java:112)
> >         at org.apache.flink.runtime.taskmanager.Task$3.run(Task.
> java:1185)
> >         ... 5 common frames omitted
> >     Caused by: org.apache.hadoop.net.ConnectTimeoutException: 60000
> millis timeout while waiting for channel to be ready for connect. ch :
> java.nio.channels.SocketChannel[connection-pending remote=/XXXX:50010]
> >         at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532)
> >         at org.apache.hadoop.hdfs.DFSOutputStream.
> createSocketForPipeline(DFSOutputStream.java:1341)
> >
> >
> >         -------------------------------------
>
>

Mime
View raw message