flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ufuk Celebi <...@apache.org>
Subject Re: Integrate Flink with S3 on EMR cluster
Date Thu, 07 Apr 2016 19:29:09 GMT
Hey Timur,

Just had a chat with Robert about this. I agree that the error message
is confusing, but it is fine it this case. The file system classes are
not on the class path of the client process, which is submitting the
job. It fails to sample the input file sizes, but this is just an
optimization step and hence it does not fail the job and only logs the
error.

After the job is submitted everything should run as expected.

You should be able to get rid of that exception by adding the missing
classes to the class path of the client process (started via
bin/flink), for example via the lib folder.

– Ufuk




On Thu, Apr 7, 2016 at 8:50 PM, Timur Fayruzov <timur.fairuzov@gmail.com> wrote:
> There's one more filesystem integration failure that I have found. My job on
> a toy dataset succeeds, but Flink log contains the following message:
> 2016-04-07 18:10:01,339 ERROR
> org.apache.flink.api.common.io.DelimitedInputFormat           - Unexpected
> problen while getting the file statistics for file 's3://...':
> java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
> java.lang.RuntimeException: java.lang.RuntimeException:
> java.lang.ClassNotFoundException: Class
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>         at
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
>         at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
>         at
> org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
>         at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
>         at
> org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:293)
>         at
> org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:45)
>         at
> org.apache.flink.optimizer.dag.DataSourceNode.computeOperatorSpecificDefaultEstimates(DataSourceNode.java:166)
>         at
> org.apache.flink.optimizer.dag.OptimizerNode.computeOutputEstimates(OptimizerNode.java:588)
>         at
> org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:61)
>         at
> org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:32)
>         at
> org.apache.flink.optimizer.dag.DataSourceNode.accept(DataSourceNode.java:250)
>         at
> org.apache.flink.optimizer.dag.SingleInputNode.accept(SingleInputNode.java:515)
>         at
> org.apache.flink.optimizer.dag.DataSinkNode.accept(DataSinkNode.java:248)
>         at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:477)
>         at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
>         at
> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:228)
>         at
> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:567)
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:314)
>         at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>         at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>         at
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
>         at
> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:100)
>         at
> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:39)
>         at
> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:39)
>         at scala.Option.foreach(Option.scala:257)
>         at
> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:39)
>         at
> com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>         at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>         at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException:
> Class com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>         at
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
>         at
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2219)
>         ... 37 more
> Caused by: java.lang.ClassNotFoundException: Class
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>         at
> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
>         at
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
>         ... 38 more
>
> I assume this may be a big problem if run on large datasets as there will be
> no information for optimizer. I tried to change EMRFS to NativeS3 driver,
> but get the same error, which is surprising. I expected NativeS3FileSystem
> to be in the classpath since it ships with Flink runtime.
>
> Thanks,
> Timur
>
>
> On Wed, Apr 6, 2016 at 2:10 AM, Ufuk Celebi <uce@apache.org> wrote:
>>
>> Yes, for sure.
>>
>> I added some documentation for AWS here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/aws.html
>>
>> Would be nice to update that page with your pull request. :-)
>>
>> – Ufuk
>>
>>
>> On Wed, Apr 6, 2016 at 4:58 AM, Chiwan Park <chiwanpark@apache.org> wrote:
>> > Hi Timur,
>> >
>> > Great! Bootstrap action for Flink is good for AWS users. I think the
>> > bootstrap action scripts would be placed in `flink-contrib` directory.
>> >
>> > If you want, one of people in PMC of Flink will be assign FLINK-1337 to
>> > you.
>> >
>> > Regards,
>> > Chiwan Park
>> >
>> >> On Apr 6, 2016, at 3:36 AM, Timur Fayruzov <timur.fairuzov@gmail.com>
>> >> wrote:
>> >>
>> >> I had a guide like that.
>> >>
>> >
>
>

Mime
View raw message