flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andra Lungu <lungu.an...@gmail.com>
Subject Re: Memory segment error
Date Mon, 30 Mar 2015 21:25:40 GMT
Hi Fabian,

I'll see what I can do :).
I am just a bit shocked. If this set of coGroups and joins was too much for
a test case, how come the following worked?

https://github.com/andralungu/flink/commit/f60b022de056ac259459b68eee6ff0ae9993f0f8

400 lines of complex computations :) And I have an even bigger one for
which the test also passed...


On Mon, Mar 30, 2015 at 2:31 PM, Fabian Hueske <fhueske@gmail.com> wrote:

> Hi Andra,
>
> I found the cause for the exception. Your test case is simply too complex
> for our testing environment.
> We restrict the TM memory for testcases to 80MB in order to execute
> multiple tests in parallel on Travis.
> I counted the memory consumers in your job and got:
>
> - 2 Combine
> - 4 GroupReduce
> - 4 CoGroup
> - 2 Joins
> - 1 SolutionSet
>
> Those are quite a few memory consumers for 20MB per slot (4 slots per TM).
>
> Do you see a way to reduce the number of operators in your testcase, maybe
> by splitting it in half?
>
> 2015-03-30 11:01 GMT+02:00 Andra Lungu <lungu.andra@gmail.com>:
>
> > Sure,
> >
> > It was in the first mail but that was sent a while ago :)
> >
> > This is the code:
> > https://github.com/andralungu/gelly-partitioning/tree/alphaSplit
> > I also added the log4j file in case it helps!
> >
> > The error is totally reproducible. 2 out of 2 people got the same.
> > Steps to reproduce:
> > 1). Clone the code; switch to alphaSplit branch
> > 2). Run CounDegreeITCase.java
> >
> > Hope we can get to the bottom of this! If you need something, just ask.
> >
> >
> > On Mon, Mar 30, 2015 at 10:54 AM, Fabian Hueske <fhueske@gmail.com>
> wrote:
> >
> > > Hmm, that is really weird.
> > > Can you point me to a branch in your repository and the test case that
> > > gives the error?
> > >
> > > Then I have a look at it and try to figure out what's going wrong.
> > >
> > > Cheers, Fabian
> > >
> > > 2015-03-30 10:43 GMT+02:00 Andra Lungu <lungu.andra@gmail.com>:
> > >
> > > > Hello,
> > > >
> > > > I went on and did some further debugging on this issue. Even though
> the
> > > > exception said that the problem comes from here:
> > > > 4837 [Join(Join at* weighEdges(NodeSplitting.java:117)*) (1/4)] ERROR
> > > > org.apache.flink.runtime.operators.RegularPactTask  - Error in task
> > code:
> > > > Join(Join at weighEdges(NodeSplitting.java:117)) (1/4)
> > > > java.lang.Exception: The data preparation for task 'Join(Join at
> > > > weighEdges(NodeSplitting.java:117))' , caused an error: Too few
> memory
> > > > segments provided. Hash Join needs at least 33 memory segments.
> > > >     at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
> > > >     at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > >     at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
> > > >     at java.lang.Thread.run(Thread.java:745)
> > > >
> > > > which is basically a chain of two joins, schema that I have repeated
> > > > several times, including in the getTriplets() method and it passed
> > every
> > > > time. I thought that this could not be right!
> > > >
> > > > So I picked each intermediate data set formed, printed it and added a
> > > > System.exit(0) afterwards. The exception comes from this method:
> > > > aggregatePartialValuesSplitVertices. Even though this computes the
> > > correct
> > > > result, it then throws the memory segment exception(!!!!!! Just for
> the
> > > > Cluster test - everything else works).
> > > >
> > > > The code in the function is:
> > > >
> > > > private static DataSet<Vertex<String, Long>>
> > > > aggregatePartialValuesSplitVertices(DataSet<Vertex<String, Long>>
> > > > resultedVertices) {
> > > >
> > > >    return resultedVertices.flatMap(new FlatMapFunction<Vertex<String,
> > > > Long>, Vertex<String, Long>>() {
> > > >
> > > >       @Override
> > > >       public void flatMap(Vertex<String, Long> vertex,
> > > > Collector<Vertex<String, Long>> collector) throws Exception
{
> > > >          int pos = vertex.getId().indexOf("_");
> > > >
> > > >          // if there is a splitted vertex
> > > >          if(pos > -1) {
> > > >             collector.collect(new Vertex<String,
> > > > Long>(vertex.getId().substring(0, pos), vertex.getValue()));
> > > >          } else {
> > > >             collector.collect(vertex);
> > > >          }
> > > >       }
> > > >    }).groupBy(0).reduceGroup(new GroupReduceFunction<Vertex<String,
> > > > Long>, Vertex<String, Long>>() {
> > > >
> > > >       @Override
> > > >       public void reduce(Iterable<Vertex<String, Long>> iterable,
> > > >                      Collector<Vertex<String, Long>> collector)
> throws
> > > > Exception {
> > > >          long sum = 0;
> > > >          Vertex<String, Long> vertex = new Vertex<String, Long>();
> > > >
> > > >          Iterator<Vertex<String, Long>> iterator =
> iterable.iterator();
> > > >          while (iterator.hasNext()) {
> > > >             vertex = iterator.next();
> > > >             sum += vertex.getValue();
> > > >          }
> > > >
> > > >          collector.collect(new Vertex<String, Long>(vertex.getId(),
> > > sum));
> > > >       }
> > > >    });
> > > >
> > > > To me, nothing seems out of the ordinary here. This is regular user
> > code.
> > > > And the behaviour in the end is definitely not the one expected. Any
> > idea
> > > > why this might be happening?
> > > >
> > > > Thanks!
> > > > Andra
> > > >
> > > > On Fri, Mar 27, 2015 at 12:08 AM, Andra Lungu <lungu.andra@gmail.com
> >
> > > > wrote:
> > > >
> > > > > Opps! Sorry! Did not know the mailing list does not support
> > attachments
> > > > :)
> > > > > https://gist.github.com/andralungu/fba36d77f79189daa183
> > > > >
> > > > > On Fri, Mar 27, 2015 at 12:02 AM, Andra Lungu <
> lungu.andra@gmail.com
> > >
> > > > > wrote:
> > > > >
> > > > >> Hi Fabian,
> > > > >>
> > > > >> I uploaded a file with my execution plan.
> > > > >>
> > > > >> On Thu, Mar 26, 2015 at 11:50 PM, Fabian Hueske <
> fhueske@gmail.com>
> > > > >> wrote:
> > > > >>
> > > > >>> Hi Andra,
> > > > >>>
> > > > >>> the error is independent of the size of the data set. A HashTable
> > > needs
> > > > >>> at
> > > > >>> least 33 memory pages to operate.
> > > > >>> Since you have 820MB of managed memory and the size of a
memory
> > page
> > > is
> > > > >>> 32KB, there should be more than 25k pages available.
> > > > >>>
> > > > >>> Can you post the execution plan of the program you execute
(
> > > > >>> ExecutionEnvironment.getExecutionPlan() )?
> > > > >>>
> > > > >>> Best, Fabian
> > > > >>>
> > > > >>> 2015-03-26 23:31 GMT+01:00 Andra Lungu <lungu.andra@gmail.com>:
> > > > >>>
> > > > >>> > For 20 edges and 5 nodes, that should be more thank
enough.
> > > > >>> >
> > > > >>> > On Thu, Mar 26, 2015 at 11:24 PM, Andra Lungu <
> > > lungu.andra@gmail.com
> > > > >
> > > > >>> > wrote:
> > > > >>> >
> > > > >>> > > Sure,
> > > > >>> > >
> > > > >>> > > 3470 [main] INFO
> > > > org.apache.flink.runtime.taskmanager.TaskManager  -
> > > > >>> > > Using 820 MB for Flink managed memory.
> > > > >>> > >
> > > > >>> > > On Thu, Mar 26, 2015 at 4:48 PM, Robert Metzger
<
> > > > rmetzger@apache.org
> > > > >>> >
> > > > >>> > > wrote:
> > > > >>> > >
> > > > >>> > >> Hi,
> > > > >>> > >>
> > > > >>> > >> during startup, Flink will log something like:
> > > > >>> > >> 16:48:09,669 INFO
> > > > org.apache.flink.runtime.taskmanager.TaskManager
> > > > >>> > >>      - Using 1193 MB for Flink managed memory.
> > > > >>> > >>
> > > > >>> > >> Can you tell us how much memory Flink is managing
in your
> > case?
> > > > >>> > >>
> > > > >>> > >>
> > > > >>> > >>
> > > > >>> > >> On Thu, Mar 26, 2015 at 4:46 PM, Andra Lungu
<
> > > > lungu.andra@gmail.com
> > > > >>> >
> > > > >>> > >> wrote:
> > > > >>> > >>
> > > > >>> > >> > Hello everyone,
> > > > >>> > >> >
> > > > >>> > >> > I guess I need to revive this old discussion:
> > > > >>> > >> >
> > > > >>> > >> >
> > > > >>> > >>
> > > > >>> >
> > > > >>>
> > > >
> > >
> >
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Memory-segment-error-when-migrating-functional-code-from-Flink-0-9-to-0-8-td3687.html
> > > > >>> > >> >
> > > > >>> > >> > At that point, the fix was to kindly ask
Alex to make his
> > > > project
> > > > >>> work
> > > > >>> > >> with
> > > > >>> > >> > 0.9.
> > > > >>> > >> >
> > > > >>> > >> > Now, I am not that lucky!
> > > > >>> > >> >
> > > > >>> > >> > This is the code:
> > > > >>> > >> >
> > > > https://github.com/andralungu/gelly-partitioning/tree/alphaSplit
> > > > >>> > >> >
> > > > >>> > >> > The main program(NodeSplitting) is working
nicely, I get
> the
> > > > >>> correct
> > > > >>> > >> > result. But if you run the test,  you
will see that
> > collection
> > > > >>> works
> > > > >>> > and
> > > > >>> > >> > cluster fails miserably with this exception:
> > > > >>> > >> >
> > > > >>> > >> > Caused by: java.lang.Exception: The data
preparation for
> > task
> > > > >>> > >> 'Join(Join at
> > > > >>> > >> > weighEdges(NodeSplitting.java:112))
> > > > >>> > (04e172e761148a65783a4363406e08c0)'
> > > > >>> > >> ,
> > > > >>> > >> > caused an error: Too few memory segments
provided. Hash
> Join
> > > > >>> needs at
> > > > >>> > >> least
> > > > >>> > >> > 33 memory segments.
> > > > >>> > >> >     at
> > > > >>> > >> >
> > > > >>> > >> >
> > > > >>> > >>
> > > > >>> >
> > > > >>>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
> > > > >>> > >> >     at
> > > > >>> > >> >
> > > > >>> > >> >
> > > > >>> > >>
> > > > >>> >
> > > > >>>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > > >>> > >> >     at
> > > > >>> > >> >
> > > > >>> > >> >
> > > > >>> > >>
> > > > >>> >
> > > > >>>
> > > >
> > >
> >
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:209)
> > > > >>> > >> >     at java.lang.Thread.run(Thread.java:745)
> > > > >>> > >> > Caused by: java.lang.IllegalArgumentException:
Too few
> > memory
> > > > >>> segments
> > > > >>> > >> > provided. Hash Join needs at least 33
memory segments.
> > > > >>> > >> >
> > > > >>> > >> > I am running locally, from IntelliJ, on
a tiny graph.
> > > > >>> > >> > $ cat /proc/meminfo
> > > > >>> > >> > MemTotal:       11405696 kB
> > > > >>> > >> > MemFree:         5586012 kB
> > > > >>> > >> > Buffers:          178100 kB
> > > > >>> > >> >
> > > > >>> > >> > I am sure I did not run out of memory...
> > > > >>> > >> >
> > > > >>> > >> > Any thoughts on this?
> > > > >>> > >> >
> > > > >>> > >> > Thanks!
> > > > >>> > >> > Andra
> > > > >>> > >> >
> > > > >>> > >>
> > > > >>> > >
> > > > >>> > >
> > > > >>> >
> > > > >>>
> > > > >>
> > > > >>
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message