flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ayush Verma <ayushver...@gmail.com>
Subject Issue using Flink on EMR
Date Mon, 03 Jun 2019 18:02:20 GMT
Hello,

We have a Flink on EMR setup following this guide
<https://github.com/aws-samples/flink-stream-processing-refarch>. YARN,
apparently changes the io.tmp.dirs property to /mnt/yarn & /mnt1/yarn. When
using these directories, the flink job gets the following error.

2019-05-22 12:23:12,515 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Job
my-flink-job (e92bf814c495e2c713e24f1d37aa3afd) switched from state
RUNNING to FAILING.
java.nio.file.NoSuchFileException:
/mnt/yarn/usercache/hadoop/appcache/application_1558347223117_0001,/mnt1/yarn/usercache/hadoop/appcache/application_1558347223117_0001/.tmp_c729afcc-7bd7-4422-8232-306e28bc62c1
 at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
 at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
 at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
 at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
 at java.nio.file.spi.FileSystemProvider.newOutputStream(FileSystemProvider.java:434)
 at java.nio.file.Files.newOutputStream(Files.java:216)
 at org.apache.flink.fs.s3.common.utils.RefCountedTmpFileCreator.apply(RefCountedTmpFileCreator.java:80)
 at org.apache.flink.fs.s3.common.utils.RefCountedTmpFileCreator.apply(RefCountedTmpFileCreator.java:39)
 at org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream.openNew(RefCountedBufferingFileStream.java:174)
 at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.boundedBufferingFileStream(S3RecoverableFsDataOutputStream.java:271)
 at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.newStream(S3RecoverableFsDataOutputStream.java:236)
 at org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.open(S3RecoverableWriter.java:74)
 at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:221)
 at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:212)
 at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)
 at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)
 at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:565)
 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:724)
 at org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102)
 at net.skyscanner.data.platform.flink.parquet.ParquetConsumerJob$3.processElement(ParquetConsumerJob.java:96)
 at net.skyscanner.data.platform.flink.parquet.ParquetConsumerJob$3.processElement(ParquetConsumerJob.java:92)
 at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
 at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
 at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
 at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
 at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
 at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:551)
 at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:344)
 at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:231)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)


If I change the io.tmp.dirs to /tmp, for eg., the job works fine. BUT my
understanding is that, YARN creates a *shared* directory, by mounting it on
all the containers so that these files can survive container terminations.
Looking for advice on how to investigate further into this issue and
hopefully resolve it.

Mime
View raw message