predictionio-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Igor Kasianov <>
Subject Re: UniversalRecommender performance bottleneck in SimilarityAnalysis.cooccurrencesIDSs
Date Tue, 22 Nov 2016 09:13:15 GMT
Thanks for Your reply!

Firstly consider previuos mail, about defaultParalelism
When I set paralelism to 12 (when I have 12 cores), than training take
about 6.5 hours
When I set 12 x 4 = 48, train takes much more time (I have stoped it after
9 hours)
When I set paralellism level to 12:
most of stages have 12 tasks, but
The stage with cooccurrenceIDs (reduce by keys -> filter in package.scala)
only 3 and take 2.5 hours (fastest of two),
When I set parelellism level to 48
most stage have 48 tasks, but the stage with coocurrenceIDs 11 and (fastest
of two takes 4.5 hours)

1) it seems that increase paralelism level to number of cores X 4 is not
obviously a good idea.

2) I'd like to test the level of paralelism = number of cores, but also set
the same level for coocurenceIDs, I have played with ParOpts, but
unfortunatelly it had no effect. I am 'inspired' with Your optimistic
assessment consider restriction of using ParOpts, but how can I learn more
about it? Only from code?

Once more thanks for your help.

Igor Kasianov

2016-11-21 18:59 GMT+02:00 Pat Ferrel <>:

> Do not use ParOpts unless you understand Mahout’s use of them better than
> I do and I’m a committer.
> Mahout tries to define it’s own meta-engine optimizations and they do not
> directly map to Spark. Mahout runs on several backend engines like Spark
> and Flink. ParOpts needs to be understood from Mahout so I only use
> .repartition and when the input is repartitioned, this carries through to
> all operations performed on it.
> There is a .distinct.collect for ids only that creates a BiMap of ids and
> this requires a phase go through one machine but this leads to huge
> performance benefits in several other stages. Scaling your Spark cluster is
> the best way to in increase speed for this phase. There are several
> optimizations already made in dealing with ids, for instance the BiMap is
> created only once for all users and broadcast to executors. The math only
> works out if the user space is identical for all input event types so we
> only calculate them once for the conversion event. Item ids must be created
> for every event since the events may have different item types.
> On Nov 20, 2016, at 3:02 PM, Igor Kasianov <> wrote:
> Yes, thanks.
> Now I see, that You use repartition in DataSource.scala
> But I still have trouble with MAHOUT coocurrencyIDS:
> For test I build mahout 0.13.0-SNAPSHOT as suggested on and
> add ParOpts to coocurrencyIDS (ParOpts(12, 12, false)) link
> <>
> min=12, exact=12, auto=False,
> But as a result it make 19 tasks on my dev machine, but only 3 on spark
> cluster. I can't find any adecuate documentation on mahout DRM.par, and
> can't understand this strange behaviour.
> It seems coocurrencyIDS do not take into account Spark parellism and
> ParOpts.
> Do You have any ideas, how can I control paralelism in coocurrencyIDS,
> because now it use only 3 cores of 12.
> Sincerely,
> Igor Kasianov
> 2016-11-19 23:04 GMT+02:00 Pat Ferrel <>:
>> The current head of the template repo repartitions input based on Spark's
>> default parallelism, which I set on the `pio train` CLI to 4 x #-of-cores.
>> This speeds up the math drastically. There are still some things that look
>> like bottlenecks but taking them out make things slower. The labels you see
>> in the Spark GUI should be considered approximations.
>> The parOpt is a mahout specific way to control partitioning and I avoid
>> it by using the Spark method.
>> On Nov 16, 2016, at 5:56 AM, Igor Kasianov <> wrote:
>> Hi,
>> I'm using UR template and have some trouble with scalability.
>> Training take 18hours (each day) and last 12 hours it use only one core.
>> As I can see URAlgorithm.scala (line 144) call
>> SimilarityAnalysis.cooccurrencesIDSs
>> with data.actions (12 partitions)
>> untill reduceByKey in AtB.scala it executes in parallel
>> but after this it executing in single thread.
>> It is strange, that when SimilarityAnalysis.scala(line 145) call
>> indexedDatasets(0).create(drm, indexedDatasets(0).columnIDs,
>> indexedDatasets(i).columnIDs)
>> it return IndexedDataset with only one partition.
>> As I can see in SimilarityAnalysis.scala(line 63)
>> drmARaw.par(auto = true)
>> May be this cause decreasing the number of partitions.
>> As I can see in master branch of MAHOUT
>> has ParOpt:
>> main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala#L142
>> May be this can fix the problem.
>> So, am I right with root of problems, and how can I fix it?
>> <Screenshot from 2016-11-16 15:42:36.png>
>> I have spark cluster with 12 Cores and 128GB but with increasing number
>> of events, I can't scale UR, beause of this bottleneck
>> P.S., please do not suggest to use event window (I've already use it. but
>> daily numer of events are increasing)

View raw message