flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Geoffrey Mon <geof...@gmail.com>
Subject Re: Many operations cause StackOverflowError with AWS EMR YARN cluster
Date Mon, 14 Nov 2016 16:38:33 GMT
Hi Ufuk,

The master instance of the cluster was also a m3.xlarge instance with 15 GB
RAM, which I would've expected to be enough. I have gotten the program to
run successfully on a personal virtual cluster where each node has 8 GB RAM
and where the master node was also a worker node, so the problem appears to
have something to do with YARN's memory behavior (such as on EMR).

Nevertheless, it would probably be a good idea to modify my code to reduce
its memory usage. When running my code on my local cluster, performance was
probably bottlenecked.

The job does use a for loop to run the core operations for a specific
number of times, specified as a command line parameter. If it helps, here
is my code:
Python: https://github.com/quinngroup/pyflink-r1dl/blob/master/R1DL_Flink.py
(L260
is the core for loop)
Java:
https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java
(L120
is the core for loop)
I would expect the join operations to be a big cause of the excessive
memory usage.

Thanks!

Geoffrey

On Mon, Nov 14, 2016 at 5:13 AM Ufuk Celebi <uce@apache.org> wrote:

> The Python API is in alpha state currently, so we would have to check if
> it is related specifically to that. Looping in Chesnay who worked on that.
>
> The JVM GC error happens on the client side as that's where the optimizer
> runs. How much memory does the client submitting the job have?
>
> How do you compose the job? Do you have nested loops, e.g. for() { ...
> bulk iteration Flink program }?
>
> – Ufuk
>
> On 14 November 2016 at 08:02:26, Geoffrey Mon (geofbot@gmail.com) wrote:
> > Hello all,
> >
> > I have a pretty complicated plan file using the Flink Python API running
> on
> > a AWS EMR cluster of m3.xlarge instances using YARN. The plan is for a
> > dictionary learning algorithm and has to run a sequence of operations
> many
> > times; each sequence involves bulk iterations with join operations and
> > other more intensive operations, and depends on the results of the
> previous
> > sequence. I have found that when the number of times to run this sequence
> > of operations is high (e.g. 20) I get this exception:
> >
> > Uncaught error from thread
> > [flink-akka.remote.default-remote-dispatcher-7] shutting down JVM
> > since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[flink]
> > java.lang.StackOverflowError
> > at
> java.io.ObjectOutputStream$ReplaceTable.lookup(ObjectOutputStream.java:2399)
> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1113)
> > at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> > at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> > at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> > at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> > ..............................
> >
> > I assume this problem is caused by having to send too many serialized
> > operations between Java and Python. When using a Java implementation of
> the
> > same operations, I also get:
> >
> > java.lang.OutOfMemoryError: GC overhead limit exceeded
> > at
> org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:106)
> > at
> org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:99)
> > at
> org.apache.flink.optimizer.plan.DualInputPlanNode.(DualInputPlanNode.java:90)
> > at
> org.apache.flink.optimizer.plan.DualInputPlanNode.(DualInputPlanNode.java:69)
> > at
> org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties.instantiate(HashJoinBuildSecondProperties.java:81)
> > at
> org.apache.flink.optimizer.dag.TwoInputNode.instantiate(TwoInputNode.java:607)
> > ............
> >
> > The problem seems to caused by YARN's handling of memory, because I have
> > gotten the same Python implementation to work on a smaller, local virtual
> > cluster that is not using YARN, even though my local cluster has far
> fewer
> > computing resources than the 15 GB RAM m3.xlarge AWS instances that EMR
> is
> > using. After the YARN job has failed, sometimes a python process is left
> on
> > the cluster using up most of the RAM.
> >
> > How can I solve this issue? I am unsure of how to reduce the number of
> > operations while keeping the same functionality.
> >
> > Thanks,
> > Geoffrey
> >
>
>

Mime
View raw message