flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Clifford Resnick <cresn...@mediamath.com>
Subject Re: Error using S3a State Backend: Window Operators sending directory instead of fully qualified file?
Date Fri, 22 Jul 2016 04:25:43 GMT
I have a fix and test for a recursive HDFSCopyToLocal. I also added similar code to Yarn application
staging. However, even though all files and resources now copy correctly, S3A still fails
on Flink session creation. The failure stems from the lib folder being registered as an application
resource (as opposed to its contained contents). Since there is no such thing as a directory
in S3, there is no file creation timestamp, and the local folder resource fails with the following
error:

java.io.IOException: Resource s3a://mm-dev-flink-savepoints/user/ec2-user/.flink/application_1469150519301_0014/lib
changed on src filesystem (expected 0, was 1469155595413
	at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253)
	at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:61)
	at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
	at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:357)
	at java.security.AccessController.doPrivileged(Native Method)..

I was stumped by why S3NativeFilesystem does _not_ fail, but reading this perhaps answers
why: http://stackoverflow.com/questions/15619712/newly-created-s3-directory-has-1969-12-31-as-timestamp

Everything works for all FileSystem implementations if I change the staging code to expand
directory resources to file resources, effectively flattening all resources into the base
directory. The only issue with this approach would be if there are like-named classpath resources
in nested directories, but apparently the current implementation only copies jars so perhaps
that’s a non-issue.

Short of altering S3A to perform the linked “hack”, I don’t see how Flink/Yarn/S3a can
work as currently implemented. I can add the resource directory flattening to my impending
PR but I just want to be sure to first mention the risk (like-named nested resources). 

BTW, If anyone is wondering why I’m interested in S3a over S3n, it’s for this: https://issues.apache.org/jira
    /browse/HADOOP-11183. In-memory multipart writes would be a great way to use the rolling
file appender.                                   

-Cliff




On 7/19/16, 4:00 AM, "Ufuk Celebi" <uce@apache.org> wrote:

    Feel free to do the contribution at any time you like. We can also
    always make it part of a bugfix release if it does not make it into
    the upcoming 1.1 RC (probably end of this week or beginning of next).
    Feel free to ping me if you need any feed back or pointers.
    
    – Ufuk
    
    
    On Mon, Jul 18, 2016 at 9:52 PM, Clifford Resnick
    <cresnick@mediamath.com> wrote:
    > In 1.1, AbstractYarnClusterDescriptor pushes contents of flink/lib (local to where
the yarn app is launched) to Yarn with a single directory copy. In 1.0.3 it looked like it
was copying the individual jars.
    >
    > So, yes I did actually change HDFSCopyToLocal, which was easy, but the job staging
in the above class also needs altering. I’m happy to contribute on both though I won’t
be able to get to it until later this week.
    >
    > -Cliff
    >
    >
    >
    > On 7/18/16, 3:38 PM, "Ufuk Celebi" <uce@apache.org> wrote:
    >
    >     Hey Cliff! Good to see that we came to the same conclusion :-) What do
    >     you mean with copying of the "lib" folder? This issue should be the
    >     same for both 1.0 and 1.1. Another work around could be to use the
    >     fully async RocksDB snapshots with Flink 1.1-SNAPSHOT.
    >
    >     If you like, you could also work on the issue I've created by
    >     implementing the recursive File copy in Flink (in HDFSCopyToLocal) and
    >     contribute this via a pull request.
    >
    >     – Ufuk
    >
    >
    >     On Mon, Jul 18, 2016 at 7:22 PM, Clifford Resnick
    >     <cresnick@mediamath.com> wrote:
    >     > Hi Ufuk,
    >     >
    >     > My mail was down, so I missed this response. Thanks for that.
    >     >
    >     > On 7/18/16, 10:38 AM, "Ufuk Celebi" <uce@apache.org> wrote:
    >     >
    >     >     Hey Cliff!
    >     >
    >     >     I was able to reproduce this by locally running a job and RocksDB semi
    >     >     asynchronous checkpoints (current default) to S3A. I've created an
    >     >     issue here: https://issues.apache.org/jira/browse/FLINK-4228.
    >     >
    >     >     Running with S3N it is working as expected. You can use that
    >     >     implementation as a work around. I don't know whether it's possible
to
    >     >     disable creation of MD5 hashes for S3A.
    >     >
    >     >     – Ufuk
    >     >
    >     >     On Sat, Jul 16, 2016 at 6:26 PM, Clifford Resnick
    >     >     <cresnick@mediamath.com> wrote:
    >     >     > Using Flink 1.1-SNAPSHOT, Hadoop-aws 2.6.4
    >     >     >
    >     >     >
    >     >     >
    >     >     > The error I’m getting is :
    >     >     >
    >     >     >
    >     >     >
    >     >     > 11:05:44,425 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask
    >     >     > - Caught exception while materializing asynchronous checkpoints.
    >     >     >
    >     >     > com.amazonaws.AmazonClientException: Unable to calculate MD5 hash:
    >     >     > /var/folders/t8/k5764ltj4sq4ft06c1zp0nxn928mwr/T/flink-io-247956be-e422-4222-a512-e3ae321b1590/ede87211c622f86d1ef7b2b323076e79/WindowOperator_10_3/dummy_state/31b7ca7b-dc94-4d40-84c7-4f10ebc644a2/local-chk-1
    >     >     > (Is a directory)
    >     >     >
    >     >     >                 at
    >     >     > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1266)
    >     >     >
    >     >     >                 at
    >     >     > com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:131)
    >     >     >
    >     >     >                 at
    >     >     > com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:123)
    >     >     >
    >     >     >                 at
    >     >     > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:139)
    >     >     >
    >     >     >                 at
    >     >     > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:47)
    >     >     >
    >     >     >                 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    >     >     >
    >     >     >                 at
    >     >     > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    >     >     >
    >     >     >                 at
    >     >     > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    >     >     >
    >     >     >                 at java.lang.Thread.run(Thread.java:745)
    >     >     >
    >     >     >
    >     >     >
    >     >     > In the debugger I noticed that some of the uploaded checkpoints
are from the
    >     >     > configured /tmp location. These succeed as file in the request
is fully
    >     >     > qualified, but I guess it’s different for WindowOperators? Here
the file in
    >     >     > the request (using a different /var/folders.. location not configured
by me
    >     >     > – must be a mac thing?) is actually a directory. The AWS api
is failing when
    >     >     > it tries to calculate an MD5 of the directory. The Flink side of
the
    >     >     > codepath is hard to discern from debugging because it’s asynchronous.
    >     >     >
    >     >     >
    >     >     >
    >     >     > I get the same issue whether local or on a CentOs- based YARN cluster.
    >     >     > Everything works if I use HDFS instead. Any insight will be greatly
    >     >     > appreciated! When I get a chance later I may try S3n or perhaps
S3a with MD5
    >     >     > verification skipped.
    >     >     >
    >     >     >
    >     >     >
    >     >     > -Cliff
    >     >     >
    >     >     >
    >     >     >
    >     >     >
    >     >     >
    >     >     >
    >     >     >
    >     >     >
    >     >
    >     >
    >
    >
    

Mime
View raw message