Return-Path: X-Original-To: apmail-flink-dev-archive@www.apache.org Delivered-To: apmail-flink-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BFB11182BF for ; Mon, 8 Jun 2015 11:59:28 +0000 (UTC) Received: (qmail 67579 invoked by uid 500); 8 Jun 2015 11:59:28 -0000 Delivered-To: apmail-flink-dev-archive@flink.apache.org Received: (qmail 67519 invoked by uid 500); 8 Jun 2015 11:59:28 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 67503 invoked by uid 99); 8 Jun 2015 11:59:28 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Jun 2015 11:59:28 +0000 Received: from mail-wi0-f179.google.com (mail-wi0-f179.google.com [209.85.212.179]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id CE5721A03BB for ; Mon, 8 Jun 2015 11:59:27 +0000 (UTC) Received: by wifx6 with SMTP id x6so84000045wif.0 for ; Mon, 08 Jun 2015 04:59:26 -0700 (PDT) X-Received: by 10.180.208.7 with SMTP id ma7mr21962529wic.0.1433764766523; Mon, 08 Jun 2015 04:59:26 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Till Rohrmann Date: Mon, 08 Jun 2015 11:59:15 +0000 Message-ID: Subject: Re: ALS implementation To: dev@flink.apache.org Content-Type: multipart/alternative; boundary=001a11c383ce99aa4f0518005f7e --001a11c383ce99aa4f0518005f7e Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi Felix, I tried to reproduce the problem with the *Hash join exceeded maximum number of recursions, without reducing partitions enough to be memory resident.* exception. I used the same data set and the same settings for ALS. However, on my machine it runs through without this exception. Could you tell me on what computer you=E2=80=99re running the program? How = many cores, how much memory do you give the JVM as standard value? How does your program looks like? Do you only calculate the factorization and print it? Or do you also try to calculate predictions? I found only one possible join operation where this exception could be thrown, namely the predict operation. However, the join key should be unique for the build side at this position, because you join the the user vectors with the user ids of you query there. All other joins use the merge strategy. Is it correct, that the factorization has been completed before the exception occurs? You can see that in the output if the 10th iteration has been reached. Maybe you can also send us the execution plan: println(env.getExecutionPlan()). What we could try is to insert a join hint for the respective join operation and see whether this solves your problem. Cheers, Till =E2=80=8B On Sat, Jun 6, 2015 at 1:32 AM Till Rohrmann wrote: > I'll look into it to find the responsible join operation. > On Jun 5, 2015 10:50 AM, "Stephan Ewen" wrote: > >> There are two different issues here: >> >> 1) Flink does figure out how much memory a join gets, but that memory ma= y >> be too little for the join to accept it. Flink plans highly conservative >> right now - too conservative often, which is something we have on the >> immediate roadmap to fix. >> >> 2) The "Hash Join exceeded recursions" problems is made worse by little >> memory, but is usually an indicator that the join is running the wrong w= ay >> anyways. The side with many duplicates should rarely be the build side, >> but >> in most cases the probe side. >> >> >> Stephan >> >> >> >> >> On Fri, Jun 5, 2015 at 9:13 AM, Felix Neutatz >> wrote: >> >> > Shouldn't Flink figure it out on its own, how much memory there is for >> the >> > join? >> > >> > The detailed trace for the Nullpointer exception can be found here: >> > >> > >> https://github.com/FelixNeutatz/IMPRO-3.SS15/blob/8b679f1c2808a2c6d69008= 24409fbd47e8bed826/NullPointerException.txt >> > >> > Best regards, >> > Felix >> > >> > 2015-06-04 19:41 GMT+02:00 Till Rohrmann : >> > >> > > I think it is not a problem of join hints, but rather of too little >> > memory >> > > for the join operator. If you set the temporary directory, then the >> job >> > > will be split in smaller parts and thus each operator gets more >> memory. >> > > Alternatively, you can increase the memory you give to the Task >> Managers. >> > > >> > > The problem with the NullPointerException won't be solved by this, >> > though. >> > > Could you send the full stack trace for that? >> > > >> > > Cheers, >> > > Till >> > > On Jun 4, 2015 7:10 PM, "Andra Lungu" wrote: >> > > >> > > > Hi Felix, >> > > > >> > > > Passing a JoinHint to your function should help. >> > > > see: >> > > > >> > > > >> > > >> > >> http://mail-archives.apache.org/mod_mbox/flink-user/201504.mbox/%3CCANC1= h_vFFbQyyiKTzCDPihn09r4HE4OLuiuRSJnci_rWc+cccA@mail.gmail.com%3E >> > > > >> > > > Cheers, >> > > > Andra >> > > > >> > > > On Thu, Jun 4, 2015 at 7:07 PM, Felix Neutatz < >> neutatz@googlemail.com> >> > > > wrote: >> > > > >> > > > > after bug fix: >> > > > > >> > > > > for 100 blocks and standard jvm heap space >> > > > > >> > > > > Caused by: java.lang.RuntimeException: Hash join exceeded maximu= m >> > > number >> > > > of >> > > > > recursions, without reducing partitions enough to be memory >> resident. >> > > > > Probably cause: Too many duplicate keys. >> > > > > at >> > > > > >> > > > > >> > > > >> > > >> > >> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromS= pilledPartition(MutableHashTable.java:718) >> > > > > at >> > > > > >> > > > > >> > > > >> > > >> > >> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPart= ition(MutableHashTable.java:506) >> > > > > at >> > > > > >> > > > > >> > > > >> > > >> > >> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(Muta= bleHashTable.java:543) >> > > > > at >> > > > > >> > > > > >> > > > >> > > >> > >> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIte= rator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104) >> > > > > at >> > > > >> > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:17= 3) >> > > > > at >> > > > > >> > > > > >> > > > >> > > >> > >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.j= ava:496) >> > > > > at >> > > > > >> > > > > >> > > > >> > > >> > >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTas= k.java:362) >> > > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> > > > > at java.lang.Thread.run(Thread.java:745) >> > > > > >> > > > > >> > > > > for 150 blocks and 5G jvm heap space >> > > > > >> > > > > Caused by: java.lang.NullPointerException >> > > > > at >> > > > > >> > > > > >> > > > >> > > >> > >> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(Has= hPartition.java:310) >> > > > > ... >> > > > > >> > > > > Best regards, >> > > > > Felix >> > > > > >> > > > > 2015-06-04 10:19 GMT+02:00 Felix Neutatz > >: >> > > > > >> > > > > > Yes, I will try it again with the newest update :) >> > > > > > >> > > > > > 2015-06-04 10:17 GMT+02:00 Till Rohrmann < >> till.rohrmann@gmail.com >> > >: >> > > > > > >> > > > > >> If the first error is not fixed by Chiwans PR, then we should >> > > create a >> > > > > >> JIRA >> > > > > >> for it to not forget it. >> > > > > >> >> > > > > >> @Felix: Chiwan's PR is here [1]. Could you try to run ALS aga= in >> > with >> > > > > this >> > > > > >> version? >> > > > > >> >> > > > > >> Cheers, >> > > > > >> Till >> > > > > >> >> > > > > >> [1] https://github.com/apache/flink/pull/751 >> > > > > >> >> > > > > >> On Thu, Jun 4, 2015 at 10:10 AM, Chiwan Park < >> > chiwanpark@icloud.com >> > > > >> > > > > >> wrote: >> > > > > >> >> > > > > >> > Hi. The second bug is fixed by the recent change in PR. >> > > > > >> > But there is just no test case for first bug. >> > > > > >> > >> > > > > >> > Regards, >> > > > > >> > Chiwan Park >> > > > > >> > >> > > > > >> > > On Jun 4, 2015, at 5:09 PM, Ufuk Celebi >> > wrote: >> > > > > >> > > >> > > > > >> > > I think both are bugs. They are triggered by the differen= t >> > > memory >> > > > > >> > > configurations. >> > > > > >> > > >> > > > > >> > > @chiwan: is the 2nd error fixed by your recent change? >> > > > > >> > > >> > > > > >> > > @felix: if yes, can you try the 2nd run again with the >> > changes? >> > > > > >> > > >> > > > > >> > > On Thursday, June 4, 2015, Felix Neutatz < >> > > neutatz@googlemail.com> >> > > > > >> wrote: >> > > > > >> > > >> > > > > >> > >> Hi, >> > > > > >> > >> >> > > > > >> > >> I played a bit with the ALS recommender algorithm. I use= d >> the >> > > > > >> movielens >> > > > > >> > >> dataset: >> > > > > >> > >> >> > > > > >> http://files.grouplens.org/datasets/movielens/ml-latest-README.html >> > > > > >> > >> >> > > > > >> > >> The rating matrix has 21.063.128 entries (ratings). >> > > > > >> > >> >> > > > > >> > >> I run the algorithm with 3 configurations: >> > > > > >> > >> >> > > > > >> > >> 1. standard jvm heap space: >> > > > > >> > >> >> > > > > >> > >> val als =3D ALS() >> > > > > >> > >> .setIterations(10) >> > > > > >> > >> .setNumFactors(10) >> > > > > >> > >> .setBlocks(100) >> > > > > >> > >> >> > > > > >> > >> throws: >> > > > > >> > >> java.lang.RuntimeException: Hash Join bug in memory >> > management: >> > > > > >> Memory >> > > > > >> > >> buffers leaked. >> > > > > >> > >> at >> > > > > >> > >> >> > > > > >> > >> >> > > > > >> > >> > > > > >> >> > > > > >> > > > >> > > >> > >> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromS= pilledPartition(MutableHashTable.java:733) >> > > > > >> > >> at >> > > > > >> > >> >> > > > > >> > >> >> > > > > >> > >> > > > > >> >> > > > > >> > > > >> > > >> > >> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPart= ition(MutableHashTable.java:508) >> > > > > >> > >> at >> > > > > >> > >> >> > > > > >> > >> >> > > > > >> > >> > > > > >> >> > > > > >> > > > >> > > >> > >> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(Muta= bleHashTable.java:541) >> > > > > >> > >> at >> > > > > >> > >> >> > > > > >> > >> >> > > > > >> > >> > > > > >> >> > > > > >> > > > >> > > >> > >> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIte= rator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104) >> > > > > >> > >> at >> > > > > >> > >> > > > > >> > > >> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173) >> > > > > >> > >> at >> > > > > >> > >> >> > > > > >> > >> >> > > > > >> > >> > > > > >> >> > > > > >> > > > >> > > >> > >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.j= ava:496) >> > > > > >> > >> at >> > > > > >> > >> >> > > > > >> > >> >> > > > > >> > >> > > > > >> >> > > > > >> > > > >> > > >> > >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTas= k.java:362) >> > > > > >> > >> at >> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> > > > > >> > >> at java.lang.Thread.run(Thread.java:745) >> > > > > >> > >> >> > > > > >> > >> 2. 5G jvm heap space >> > > > > >> > >> >> > > > > >> > >> val als =3D ALS() >> > > > > >> > >> .setIterations(10) >> > > > > >> > >> .setNumFactors(10) >> > > > > >> > >> .setBlocks(150) >> > > > > >> > >> >> > > > > >> > >> throws: >> > > > > >> > >> >> > > > > >> > >> java.lang.NullPointerException >> > > > > >> > >> at >> > > > > >> > >> >> > > > > >> > >> >> > > > > >> > >> > > > > >> >> > > > > >> > > > >> > > >> > >> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(Has= hPartition.java:310) >> > > > > >> > >> at >> > > > > >> > >> >> > > > > >> > >> >> > > > > >> > >> > > > > >> >> > > > > >> > > > >> > > >> > >> org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(= MutableHashTable.java:1090) >> > > > > >> > >> at >> > > > > >> > >> >> > > > > >> > >> >> > > > > >> > >> > > > > >> >> > > > > >> > > > >> > > >> > >> org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEnt= ry(MutableHashTable.java:923) >> > > > > >> > >> at >> > > > > >> > >> >> > > > > >> > >> >> > > > > >> > >> > > > > >> >> > > > > >> > > > >> > > >> > >> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromS= pilledPartition(MutableHashTable.java:779) >> > > > > >> > >> at >> > > > > >> > >> >> > > > > >> > >> >> > > > > >> > >> > > > > >> >> > > > > >> > > > >> > > >> > >> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPart= ition(MutableHashTable.java:508) >> > > > > >> > >> at >> > > > > >> > >> >> > > > > >> > >> >> > > > > >> > >> > > > > >> >> > > > > >> > > > >> > > >> > >> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(Muta= bleHashTable.java:541) >> > > > > >> > >> at >> > > > > >> > >> >> > > > > >> > >> >> > > > > >> > >> > > > > >> >> > > > > >> > > > >> > > >> > >> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIte= rator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104) >> > > > > >> > >> at >> > > > > >> > >> > > > > >> > > >> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173) >> > > > > >> > >> at >> > > > > >> > >> >> > > > > >> > >> >> > > > > >> > >> > > > > >> >> > > > > >> > > > >> > > >> > >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.j= ava:496) >> > > > > >> > >> at >> > > > > >> > >> >> > > > > >> > >> >> > > > > >> > >> > > > > >> >> > > > > >> > > > >> > > >> > >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTas= k.java:362) >> > > > > >> > >> at >> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> > > > > >> > >> at java.lang.Thread.run(Thread.java:745) >> > > > > >> > >> >> > > > > >> > >> 3. 14G jvm heap space >> > > > > >> > >> >> > > > > >> > >> val als =3D ALS() >> > > > > >> > >> .setIterations(10) >> > > > > >> > >> .setNumFactors(10) >> > > > > >> > >> .setBlocks(150) >> > > > > >> > >> .setTemporaryPath("/tmp/tmpALS") >> > > > > >> > >> >> > > > > >> > >> -> works >> > > > > >> > >> >> > > > > >> > >> Is this a Flink problem or is it just my bad >> configuration? >> > > > > >> > >> >> > > > > >> > >> Best regards, >> > > > > >> > >> Felix >> > > > > >> > >> >> > > > > >> > >> > > > > >> > >> > > > > >> > >> > > > > >> > >> > > > > >> > >> > > > > >> > >> > > > > >> >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > --001a11c383ce99aa4f0518005f7e--