flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Clifford Resnick <cresn...@mediamath.com>
Subject Re: Error using S3a State Backend: Window Operators sending directory instead of fully qualified file?
Date Mon, 18 Jul 2016 17:18:51 GMT
The root cause of this problem seems to be that Flink is copying directories with the FileSystem.
Unfortunately, unlike the default HDFS implementation, org.apache.ahadoop.fs.s3a.S3AFileSystem
does not implement a recursive copyFromLocalFile and Flink 1.0.3 fails when is tries to copy
a Window Operator savepoint directory. Flink 1.1 is worse as it cannot even set up the session,
because it tries to copy the flink/lib dir on init.

I can work around this in 1.0.3 by subclassing S3AFilesSystem and implementing a recursive
copyFromLocalFile. Unfortunately, this isn’t good enough for Flink 1.1 since it expects
the copied “lib” director to exist in cache to set up the classpath with (I think).

I’m really hoping there is something simple that I’m missing here that someone can fill
me in on. Anyone else successfully up and working with Flink -> Yarn -> S3A? If so,
what version of Hadoop and Flink, and was there anything you did other than configure core-site.xml?

-Cliff


From: Clifford Resnick <cresnick@mediamath.com>
Reply-To: "user@flink.apache.org" <user@flink.apache.org>
Date: Saturday, July 16, 2016 at 12:26 PM
To: "user@flink.apache.org" <user@flink.apache.org>
Subject: Error using S3a State Backend: Window Operators sending directory instead of fully
qualified file?

Using Flink 1.1-SNAPSHOT, Hadoop-aws 2.6.4

The error I’m getting is :

11:05:44,425 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Caught
exception while materializing asynchronous checkpoints.
com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: /var/folders/t8/k5764ltj4sq4ft06c1zp0nxn928mwr/T/flink-io-247956be-e422-4222-a512-e3ae321b1590/ede87211c622f86d1ef7b2b323076e79/WindowOperator_10_3/dummy_state/31b7ca7b-dc94-4d40-84c7-4f10ebc644a2/local-chk-1
(Is a directory)
                at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1266)
                at com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:131)
                at com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:123)
                at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:139)
                at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:47)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
                at java.lang.Thread.run(Thread.java:745)

In the debugger I noticed that some of the uploaded checkpoints are from the configured /tmp
location. These succeed as file in the request is fully qualified, but I guess it’s different
for WindowOperators? Here the file in the request (using a different /var/folders.. location
not configured by me – must be a mac thing?) is actually a directory. The AWS api is failing
when it tries to calculate an MD5 of the directory. The Flink side of the codepath is hard
to discern from debugging because it’s asynchronous.

I get the same issue whether local or on a CentOs- based YARN cluster. Everything works if
I use HDFS instead. Any insight will be greatly appreciated! When I get a chance later I may
try S3n or perhaps S3a with MD5 verification skipped.

-Cliff




Mime
View raw message