flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Geoffrey Mon <geof...@gmail.com>
Subject Re: Many operations cause StackOverflowError with AWS EMR YARN cluster
Date Sun, 20 Nov 2016 22:54:27 GMT

I know that the reuse of the data set in my plan is causing the problem
(after one dictionary atom is learned using the data set "S", "S" is
updated for use with the next dictionary atom). When I comment out the line
updating the data set "S", I have no problem and the plan processing phase
takes substantially less time.

I assume that this is because updating and reusing "S" makes the graph of
transformations much more complicated and forces the optimizer to do much
more work, since for example the final value of "S" depends on all previous
operations combined. Is there a way to replace the for loop in my plan so
that I don't cause this complication and so that memory usage is
manageable? I considered making "S" an iterative data set, but I need to
save each dictionary atom to a file, and I wouldn't be able to do that if
"S" was iterative and not finalized.

Perhaps I would be able to collect "S" at the end of each dictionary atom
and then make the new "S" directly from these values. This however would
require that "collect" be implemented in the Python API.

In addition, I don't think the problem is YARN-specific anymore because I
have been able to reproduce it on a local machine.


On Mon, Nov 14, 2016 at 11:38 AM Geoffrey Mon <geofbot@gmail.com> wrote:

> Hi Ufuk,
> The master instance of the cluster was also a m3.xlarge instance with 15
> GB RAM, which I would've expected to be enough. I have gotten the program
> to run successfully on a personal virtual cluster where each node has 8 GB
> RAM and where the master node was also a worker node, so the problem
> appears to have something to do with YARN's memory behavior (such as on
> EMR).
> Nevertheless, it would probably be a good idea to modify my code to reduce
> its memory usage. When running my code on my local cluster, performance was
> probably bottlenecked.
> The job does use a for loop to run the core operations for a specific
> number of times, specified as a command line parameter. If it helps, here
> is my code:
> Python:
> https://github.com/quinngroup/pyflink-r1dl/blob/master/R1DL_Flink.py (L260
> is the core for loop)
> Java:
> https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java
> is the core for loop)
> I would expect the join operations to be a big cause of the excessive
> memory usage.
> Thanks!
> Geoffrey
> On Mon, Nov 14, 2016 at 5:13 AM Ufuk Celebi <uce@apache.org> wrote:
> The Python API is in alpha state currently, so we would have to check if
> it is related specifically to that. Looping in Chesnay who worked on that.
> The JVM GC error happens on the client side as that's where the optimizer
> runs. How much memory does the client submitting the job have?
> How do you compose the job? Do you have nested loops, e.g. for() { ...
> bulk iteration Flink program }?
> – Ufuk
> On 14 November 2016 at 08:02:26, Geoffrey Mon (geofbot@gmail.com) wrote:
> > Hello all,
> >
> > I have a pretty complicated plan file using the Flink Python API running
> on
> > a AWS EMR cluster of m3.xlarge instances using YARN. The plan is for a
> > dictionary learning algorithm and has to run a sequence of operations
> many
> > times; each sequence involves bulk iterations with join operations and
> > other more intensive operations, and depends on the results of the
> previous
> > sequence. I have found that when the number of times to run this sequence
> > of operations is high (e.g. 20) I get this exception:
> >
> > Uncaught error from thread
> > [flink-akka.remote.default-remote-dispatcher-7] shutting down JVM
> > since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[flink]
> > java.lang.StackOverflowError
> > at
> java.io.ObjectOutputStream$ReplaceTable.lookup(ObjectOutputStream.java:2399)
> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1113)
> > at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> > at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> > at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> > at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> > ..............................
> >
> > I assume this problem is caused by having to send too many serialized
> > operations between Java and Python. When using a Java implementation of
> the
> > same operations, I also get:
> >
> > java.lang.OutOfMemoryError: GC overhead limit exceeded
> > at
> org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:106)
> > at
> org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:99)
> > at
> org.apache.flink.optimizer.plan.DualInputPlanNode.(DualInputPlanNode.java:90)
> > at
> org.apache.flink.optimizer.plan.DualInputPlanNode.(DualInputPlanNode.java:69)
> > at
> org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties.instantiate(HashJoinBuildSecondProperties.java:81)
> > at
> org.apache.flink.optimizer.dag.TwoInputNode.instantiate(TwoInputNode.java:607)
> > ............
> >
> > The problem seems to caused by YARN's handling of memory, because I have
> > gotten the same Python implementation to work on a smaller, local virtual
> > cluster that is not using YARN, even though my local cluster has far
> fewer
> > computing resources than the 15 GB RAM m3.xlarge AWS instances that EMR
> is
> > using. After the YARN job has failed, sometimes a python process is left
> on
> > the cluster using up most of the RAM.
> >
> > How can I solve this issue? I am unsure of how to reduce the number of
> > operations while keeping the same functionality.
> >
> > Thanks,
> > Geoffrey
> >

View raw message