flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timur Fayruzov <timur.fairu...@gmail.com>
Subject Re: Integrate Flink with S3 on EMR cluster
Date Thu, 07 Apr 2016 21:32:45 GMT
The exception does not show up in the console when I run the job, it only
shows in the logs. I thought it means that it happens either on AM or TM (I
assume what I see in stdout is client log). Is my thinking right?


On Thu, Apr 7, 2016 at 12:29 PM, Ufuk Celebi <uce@apache.org> wrote:

> Hey Timur,
>
> Just had a chat with Robert about this. I agree that the error message
> is confusing, but it is fine it this case. The file system classes are
> not on the class path of the client process, which is submitting the
> job.

Do you mean that classes should be in the classpath of
`org.apache.flink.client.CliFrontend` class that `bin/flink` executes? I
tried to add EMRFS jars to this classpath but it did not help. BTW, it
seems that bin/flink script assumes that HADOOP_CLASSPATH is set, which is
not the case in my EMR setup. The HADOOP_CLASSPATH seem to be the only
point that I control here to add to classpath, so I had to set it manually.


> It fails to sample the input file sizes, but this is just an
> optimization step and hence it does not fail the job and only logs the
> error.
>
Is this optimization only for client side? In other words, does it affect
Flink's ability to choose proper type of a join?


>
> After the job is submitted everything should run as expected.
>
> You should be able to get rid of that exception by adding the missing
> classes to the class path of the client process (started via
> bin/flink), for example via the lib folder.
>
The above approach did not work, could you elaborate what you meant by 'lib
folder'?

Thanks,
Timur


> – Ufuk
>
>
>
>
> On Thu, Apr 7, 2016 at 8:50 PM, Timur Fayruzov <timur.fairuzov@gmail.com>
> wrote:
> > There's one more filesystem integration failure that I have found. My
> job on
> > a toy dataset succeeds, but Flink log contains the following message:
> > 2016-04-07 18:10:01,339 ERROR
> > org.apache.flink.api.common.io.DelimitedInputFormat           -
> Unexpected
> > problen while getting the file statistics for file 's3://...':
> > java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
> > java.lang.RuntimeException: java.lang.RuntimeException:
> > java.lang.ClassNotFoundException: Class
> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
> >         at
> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
> >         at
> >
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
> >         at
> >
> org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
> >         at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
> >         at
> >
> org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:293)
> >         at
> >
> org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:45)
> >         at
> >
> org.apache.flink.optimizer.dag.DataSourceNode.computeOperatorSpecificDefaultEstimates(DataSourceNode.java:166)
> >         at
> >
> org.apache.flink.optimizer.dag.OptimizerNode.computeOutputEstimates(OptimizerNode.java:588)
> >         at
> >
> org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:61)
> >         at
> >
> org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:32)
> >         at
> >
> org.apache.flink.optimizer.dag.DataSourceNode.accept(DataSourceNode.java:250)
> >         at
> >
> org.apache.flink.optimizer.dag.SingleInputNode.accept(SingleInputNode.java:515)
> >         at
> > org.apache.flink.optimizer.dag.DataSinkNode.accept(DataSinkNode.java:248)
> >         at
> org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:477)
> >         at
> org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
> >         at
> > org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:228)
> >         at
> > org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:567)
> >         at
> > org.apache.flink.client.program.Client.runBlocking(Client.java:314)
> >         at
> >
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
> >         at
> >
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
> >         at
> >
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
> >         at
> >
> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:100)
> >         at
> >
> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:39)
> >         at
> >
> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:39)
> >         at scala.Option.foreach(Option.scala:257)
> >         at
> > com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:39)
> >         at
> > com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
> >         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >         at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> >         at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >         at java.lang.reflect.Method.invoke(Method.java:606)
> >         at
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> >         at
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> >         at
> > org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> >         at
> >
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
> >         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
> >         at
> >
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
> >         at
> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> > Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException:
> > Class com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
> >         at
> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
> >         at
> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2219)
> >         ... 37 more
> > Caused by: java.lang.ClassNotFoundException: Class
> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
> >         at
> >
> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
> >         at
> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
> >         ... 38 more
> >
> > I assume this may be a big problem if run on large datasets as there
> will be
> > no information for optimizer. I tried to change EMRFS to NativeS3 driver,
> > but get the same error, which is surprising. I expected
> NativeS3FileSystem
> > to be in the classpath since it ships with Flink runtime.
> >
> > Thanks,
> > Timur
> >
> >
> > On Wed, Apr 6, 2016 at 2:10 AM, Ufuk Celebi <uce@apache.org> wrote:
> >>
> >> Yes, for sure.
> >>
> >> I added some documentation for AWS here:
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/aws.html
> >>
> >> Would be nice to update that page with your pull request. :-)
> >>
> >> – Ufuk
> >>
> >>
> >> On Wed, Apr 6, 2016 at 4:58 AM, Chiwan Park <chiwanpark@apache.org>
> wrote:
> >> > Hi Timur,
> >> >
> >> > Great! Bootstrap action for Flink is good for AWS users. I think the
> >> > bootstrap action scripts would be placed in `flink-contrib` directory.
> >> >
> >> > If you want, one of people in PMC of Flink will be assign FLINK-1337
> to
> >> > you.
> >> >
> >> > Regards,
> >> > Chiwan Park
> >> >
> >> >> On Apr 6, 2016, at 3:36 AM, Timur Fayruzov <timur.fairuzov@gmail.com
> >
> >> >> wrote:
> >> >>
> >> >> I had a guide like that.
> >> >>
> >> >
> >
> >
>

Mime
View raw message