flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re: Integrate Flink with S3 on EMR cluster
Date Fri, 08 Apr 2016 09:38:26 GMT
Hi Timur,

the Flink optimizer runs on the client, so the exception is thrown from the
JVM running the ./bin/flink client.
Since the statistics sampling is an optional step, its surrounded by a try
/ catch block that just logs the error message.

More answers inline below


On Thu, Apr 7, 2016 at 11:32 PM, Timur Fayruzov <timur.fairuzov@gmail.com>
wrote:

> The exception does not show up in the console when I run the job, it only
> shows in the logs. I thought it means that it happens either on AM or TM (I
> assume what I see in stdout is client log). Is my thinking right?
>
>
> On Thu, Apr 7, 2016 at 12:29 PM, Ufuk Celebi <uce@apache.org> wrote:
>
>> Hey Timur,
>>
>> Just had a chat with Robert about this. I agree that the error message
>> is confusing, but it is fine it this case. The file system classes are
>> not on the class path of the client process, which is submitting the
>> job.
>
> Do you mean that classes should be in the classpath of
> `org.apache.flink.client.CliFrontend` class that `bin/flink` executes? I
> tried to add EMRFS jars to this classpath but it did not help. BTW, it
> seems that bin/flink script assumes that HADOOP_CLASSPATH is set, which is
> not the case in my EMR setup. The HADOOP_CLASSPATH seem to be the only
> point that I control here to add to classpath, so I had to set it manually.
>

Yes, they have to be in the classpath of the CliFrontend.
The client should also work without the HADOOP_CLASSPATH being set. Its
optional for cases where you want to manually add jars to the classpath.
For example on Google Compute they set the HADOOP_CLASSPATH.

Please note that we are not transferring the contents of the
HADOOP_CLASSPATH to the other workers on the cluster. So you have to set
the HADOOP_CLASSPATH on all machines.
Another approach is just putting the required jar into the "lib/" folder of
your Flink installation (the folder is next to "bin/", "conf/", "logs/").


>
>
>> It fails to sample the input file sizes, but this is just an
>> optimization step and hence it does not fail the job and only logs the
>> error.
>>
> Is this optimization only for client side? In other words, does it affect
> Flink's ability to choose proper type of a join?
>

Your DataSet program is translated into a generic representation. Then,
this representation is passed into the optimizer, which decides on join /
sorting / data shipping strategies. The output of the optimizer is sent to
the JobManager for execution.
If the optimizer is not able to get good statistics about the input (like
in your case), it will default to robust execution strategies. I don't know
the input sizes of your job and the structure of your job, but chances are
high that the final plan is the same with and without the input statistics.
Only in cases where one join side is very small the input statistics might
be relevant.
Other optimizations, such as reusing existing data partitioning or ordering
work independent of the input sampling.


>
>
>>
>> After the job is submitted everything should run as expected.
>>
>> You should be able to get rid of that exception by adding the missing
>> classes to the class path of the client process (started via
>> bin/flink), for example via the lib folder.
>>
> The above approach did not work, could you elaborate what you meant by
> 'lib folder'?
>

See above.



>
> Thanks,
> Timur
>
>
>> – Ufuk
>>
>>
>>
>>
>> On Thu, Apr 7, 2016 at 8:50 PM, Timur Fayruzov <timur.fairuzov@gmail.com>
>> wrote:
>> > There's one more filesystem integration failure that I have found. My
>> job on
>> > a toy dataset succeeds, but Flink log contains the following message:
>> > 2016-04-07 18:10:01,339 ERROR
>> > org.apache.flink.api.common.io.DelimitedInputFormat           -
>> Unexpected
>> > problen while getting the file statistics for file 's3://...':
>> > java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
>> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>> > java.lang.RuntimeException: java.lang.RuntimeException:
>> > java.lang.ClassNotFoundException: Class
>> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>> >         at
>> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
>> >         at
>> >
>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
>> >         at
>> >
>> org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
>> >         at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
>> >         at
>> >
>> org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:293)
>> >         at
>> >
>> org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:45)
>> >         at
>> >
>> org.apache.flink.optimizer.dag.DataSourceNode.computeOperatorSpecificDefaultEstimates(DataSourceNode.java:166)
>> >         at
>> >
>> org.apache.flink.optimizer.dag.OptimizerNode.computeOutputEstimates(OptimizerNode.java:588)
>> >         at
>> >
>> org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:61)
>> >         at
>> >
>> org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:32)
>> >         at
>> >
>> org.apache.flink.optimizer.dag.DataSourceNode.accept(DataSourceNode.java:250)
>> >         at
>> >
>> org.apache.flink.optimizer.dag.SingleInputNode.accept(SingleInputNode.java:515)
>> >         at
>> >
>> org.apache.flink.optimizer.dag.DataSinkNode.accept(DataSinkNode.java:248)
>> >         at
>> org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:477)
>> >         at
>> org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
>> >         at
>> > org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:228)
>> >         at
>> > org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:567)
>> >         at
>> > org.apache.flink.client.program.Client.runBlocking(Client.java:314)
>> >         at
>> >
>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>> >         at
>> >
>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>> >         at
>> >
>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
>> >         at
>> >
>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:100)
>> >         at
>> >
>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:39)
>> >         at
>> >
>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:39)
>> >         at scala.Option.foreach(Option.scala:257)
>> >         at
>> >
>> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:39)
>> >         at
>> > com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
>> >         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >         at
>> >
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> >         at
>> >
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >         at java.lang.reflect.Method.invoke(Method.java:606)
>> >         at
>> >
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>> >         at
>> >
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>> >         at
>> > org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>> >         at
>> >
>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>> >         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>> >         at
>> >
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>> >         at
>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>> > Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException:
>> > Class com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>> >         at
>> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
>> >         at
>> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2219)
>> >         ... 37 more
>> > Caused by: java.lang.ClassNotFoundException: Class
>> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>> >         at
>> >
>> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
>> >         at
>> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
>> >         ... 38 more
>> >
>> > I assume this may be a big problem if run on large datasets as there
>> will be
>> > no information for optimizer. I tried to change EMRFS to NativeS3
>> driver,
>> > but get the same error, which is surprising. I expected
>> NativeS3FileSystem
>> > to be in the classpath since it ships with Flink runtime.
>> >
>> > Thanks,
>> > Timur
>> >
>> >
>> > On Wed, Apr 6, 2016 at 2:10 AM, Ufuk Celebi <uce@apache.org> wrote:
>> >>
>> >> Yes, for sure.
>> >>
>> >> I added some documentation for AWS here:
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/aws.html
>> >>
>> >> Would be nice to update that page with your pull request. :-)
>> >>
>> >> – Ufuk
>> >>
>> >>
>> >> On Wed, Apr 6, 2016 at 4:58 AM, Chiwan Park <chiwanpark@apache.org>
>> wrote:
>> >> > Hi Timur,
>> >> >
>> >> > Great! Bootstrap action for Flink is good for AWS users. I think the
>> >> > bootstrap action scripts would be placed in `flink-contrib`
>> directory.
>> >> >
>> >> > If you want, one of people in PMC of Flink will be assign FLINK-1337
>> to
>> >> > you.
>> >> >
>> >> > Regards,
>> >> > Chiwan Park
>> >> >
>> >> >> On Apr 6, 2016, at 3:36 AM, Timur Fayruzov <
>> timur.fairuzov@gmail.com>
>> >> >> wrote:
>> >> >>
>> >> >> I had a guide like that.
>> >> >>
>> >> >
>> >
>> >
>>
>
>

Mime
View raw message