flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: ALS implementation
Date Fri, 05 Jun 2015 23:32:36 GMT
I'll look into it to find the responsible join operation.
On Jun 5, 2015 10:50 AM, "Stephan Ewen" <sewen@apache.org> wrote:

> There are two different issues here:
>
> 1) Flink does figure out how much memory a join gets, but that memory may
> be too little for the join to accept it. Flink plans highly conservative
> right now - too conservative often, which is something we have on the
> immediate roadmap to fix.
>
> 2) The "Hash Join exceeded recursions" problems is made worse by little
> memory, but is usually an indicator that the join is running the wrong way
> anyways. The side with many duplicates should rarely be the build side, but
> in most cases the probe side.
>
>
> Stephan
>
>
>
>
> On Fri, Jun 5, 2015 at 9:13 AM, Felix Neutatz <neutatz@googlemail.com>
> wrote:
>
> > Shouldn't Flink figure it out on its own, how much memory there is for
> the
> > join?
> >
> > The detailed trace for the Nullpointer exception can be found here:
> >
> >
> https://github.com/FelixNeutatz/IMPRO-3.SS15/blob/8b679f1c2808a2c6d6900824409fbd47e8bed826/NullPointerException.txt
> >
> > Best regards,
> > Felix
> >
> > 2015-06-04 19:41 GMT+02:00 Till Rohrmann <till.rohrmann@gmail.com>:
> >
> > > I think it is not a problem of join hints, but rather of too little
> > memory
> > > for the join operator. If you set the temporary directory, then the job
> > > will be split in smaller parts and thus each operator gets more memory.
> > > Alternatively, you can increase the memory you give to the Task
> Managers.
> > >
> > > The problem with the NullPointerException won't be solved by this,
> > though.
> > > Could you send the full stack trace for that?
> > >
> > > Cheers,
> > > Till
> > > On Jun 4, 2015 7:10 PM, "Andra Lungu" <lungu.andra@gmail.com> wrote:
> > >
> > > > Hi Felix,
> > > >
> > > > Passing a JoinHint to your function should help.
> > > > see:
> > > >
> > > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/flink-user/201504.mbox/%3CCANC1h_vFFbQyyiKTzCDPihn09r4HE4OLuiuRSJnci_rWc+cccA@mail.gmail.com%3E
> > > >
> > > > Cheers,
> > > > Andra
> > > >
> > > > On Thu, Jun 4, 2015 at 7:07 PM, Felix Neutatz <
> neutatz@googlemail.com>
> > > > wrote:
> > > >
> > > > > after bug fix:
> > > > >
> > > > > for 100 blocks and standard jvm heap space
> > > > >
> > > > > Caused by: java.lang.RuntimeException: Hash join exceeded maximum
> > > number
> > > > of
> > > > > recursions, without reducing partitions enough to be memory
> resident.
> > > > > Probably cause: Too many duplicate keys.
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:718)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:506)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:543)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > > > > at
> > > >
> > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > > > > at java.lang.Thread.run(Thread.java:745)
> > > > >
> > > > >
> > > > > for 150 blocks and 5G jvm heap space
> > > > >
> > > > > Caused by: java.lang.NullPointerException
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
> > > > > ...
> > > > >
> > > > > Best regards,
> > > > > Felix
> > > > >
> > > > > 2015-06-04 10:19 GMT+02:00 Felix Neutatz <neutatz@googlemail.com>:
> > > > >
> > > > > > Yes, I will try it again with the newest update :)
> > > > > >
> > > > > > 2015-06-04 10:17 GMT+02:00 Till Rohrmann <
> till.rohrmann@gmail.com
> > >:
> > > > > >
> > > > > >> If the first error is not fixed by Chiwans PR, then we should
> > > create a
> > > > > >> JIRA
> > > > > >> for it to not forget it.
> > > > > >>
> > > > > >> @Felix: Chiwan's PR is here [1]. Could you try to run ALS
again
> > with
> > > > > this
> > > > > >> version?
> > > > > >>
> > > > > >> Cheers,
> > > > > >> Till
> > > > > >>
> > > > > >> [1] https://github.com/apache/flink/pull/751
> > > > > >>
> > > > > >> On Thu, Jun 4, 2015 at 10:10 AM, Chiwan Park <
> > chiwanpark@icloud.com
> > > >
> > > > > >> wrote:
> > > > > >>
> > > > > >> > Hi. The second bug is fixed by the recent change in
PR.
> > > > > >> > But there is just no test case for first bug.
> > > > > >> >
> > > > > >> > Regards,
> > > > > >> > Chiwan Park
> > > > > >> >
> > > > > >> > > On Jun 4, 2015, at 5:09 PM, Ufuk Celebi <uce@apache.org>
> > wrote:
> > > > > >> > >
> > > > > >> > > I think both are bugs. They are triggered by the
different
> > > memory
> > > > > >> > > configurations.
> > > > > >> > >
> > > > > >> > > @chiwan: is the 2nd error fixed by your recent
change?
> > > > > >> > >
> > > > > >> > > @felix: if yes, can you try the 2nd run again
with the
> > changes?
> > > > > >> > >
> > > > > >> > > On Thursday, June 4, 2015, Felix Neutatz <
> > > neutatz@googlemail.com>
> > > > > >> wrote:
> > > > > >> > >
> > > > > >> > >> Hi,
> > > > > >> > >>
> > > > > >> > >> I played a bit with the ALS recommender algorithm.
I used
> the
> > > > > >> movielens
> > > > > >> > >> dataset:
> > > > > >> > >>
> > > > >
> http://files.grouplens.org/datasets/movielens/ml-latest-README.html
> > > > > >> > >>
> > > > > >> > >> The rating matrix has 21.063.128 entries (ratings).
> > > > > >> > >>
> > > > > >> > >> I run the algorithm with 3 configurations:
> > > > > >> > >>
> > > > > >> > >> 1. standard jvm heap space:
> > > > > >> > >>
> > > > > >> > >> val als = ALS()
> > > > > >> > >>   .setIterations(10)
> > > > > >> > >>   .setNumFactors(10)
> > > > > >> > >>   .setBlocks(100)
> > > > > >> > >>
> > > > > >> > >> throws:
> > > > > >> > >> java.lang.RuntimeException: Hash Join bug
in memory
> > management:
> > > > > >> Memory
> > > > > >> > >> buffers leaked.
> > > > > >> > >> at
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733)
> > > > > >> > >> at
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
> > > > > >> > >> at
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
> > > > > >> > >> at
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > > > > >> > >> at
> > > > > >> >
> > > > >
> > >
> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > > > > >> > >> at
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > > > > >> > >> at
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > > > >> > >> at
> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > > > > >> > >> at java.lang.Thread.run(Thread.java:745)
> > > > > >> > >>
> > > > > >> > >> 2. 5G jvm heap space
> > > > > >> > >>
> > > > > >> > >> val als = ALS()
> > > > > >> > >>   .setIterations(10)
> > > > > >> > >>   .setNumFactors(10)
> > > > > >> > >>   .setBlocks(150)
> > > > > >> > >>
> > > > > >> > >> throws:
> > > > > >> > >>
> > > > > >> > >> java.lang.NullPointerException
> > > > > >> > >> at
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
> > > > > >> > >> at
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090)
> > > > > >> > >> at
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923)
> > > > > >> > >> at
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779)
> > > > > >> > >> at
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
> > > > > >> > >> at
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
> > > > > >> > >> at
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > > > > >> > >> at
> > > > > >> >
> > > > >
> > >
> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > > > > >> > >> at
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > > > > >> > >> at
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > > > >> > >> at
> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > > > > >> > >> at java.lang.Thread.run(Thread.java:745)
> > > > > >> > >>
> > > > > >> > >> 3. 14G jvm heap space
> > > > > >> > >>
> > > > > >> > >> val als = ALS()
> > > > > >> > >>   .setIterations(10)
> > > > > >> > >>   .setNumFactors(10)
> > > > > >> > >>   .setBlocks(150)
> > > > > >> > >>   .setTemporaryPath("/tmp/tmpALS")
> > > > > >> > >>
> > > > > >> > >> -> works
> > > > > >> > >>
> > > > > >> > >> Is this a Flink problem or is it just my bad
configuration?
> > > > > >> > >>
> > > > > >> > >> Best regards,
> > > > > >> > >> Felix
> > > > > >> > >>
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message