mahout-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dmitriy Lyubimov <dlie...@gmail.com>
Subject Re: spark-itemsimilarity scalability / Spark parallelism issues (SimilarityAnalysis.cooccurrencesIDSs)
Date Wed, 23 Aug 2017 17:16:29 GMT
there has been some work on optimizing in-memory assigns for vectors, but
the matrix work for the in-memory java-backed assigns is admittedly more
patchy at best, given the amount of variations.

On Mon, Aug 21, 2017 at 12:05 PM, Pat Ferrel <pat@occamsmachete.com> wrote:

> Matt
>
> I’ll create a feature branch of Mahout in my git repo for simplicity (we
> are in code freeze for Mahout right now) Then if you could peel off you
> changes and make a PR against it. Everyone can have a look before any
> change is made to the ASF repos.
>
> Do a PR against this https://github.com/pferrel/mahout/tree/sparse-speedup
> <https://github.com/pferrel/mahout/tree/sparse-speedup>, even if it’s not
> working we can take a look. The branch right now is just a snapshot of the
> current master in code freeze.
>
> Mahout has always had methods to work with different levels of sparsity
> and you may have found a missing point to optimize. Let’s hope so.
>
>
> On Aug 21, 2017, at 11:47 AM, Andrew Palumbo <ap.dev@outlook.com> wrote:
>
> I should mention that the densisty is currently set quite high, and we've
> been discussing a user defined setting for this.  Something that we have
> not worked in yet.
>
> ________________________________
> From: Andrew Palumbo <ap.dev@outlook.com>
> Sent: Monday, August 21, 2017 2:44:35 PM
> To: user@mahout.apache.org
> Subject: Re: spark-itemsimilarity scalability / Spark parallelism issues
> (SimilarityAnalysis.cooccurrencesIDSs)
>
>
> We do currently have optimizations based on density analysis in use e.g.:
> in AtB.
>
>
> https://github.com/apache/mahout/blob/08e02602e947ff945b9bd73ab5f0b4
> 5863df3e53/math-scala/src/main/scala/org/apache/mahout/
> math/scalabindings/package.scala#L431
>
>
>
> +1 to PR. thanks for pointing this out.
>
>
> --andy
>
> ________________________________
> From: Pat Ferrel <pat@occamsmachete.com>
> Sent: Monday, August 21, 2017 2:26:58 PM
> To: user@mahout.apache.org
> Subject: Re: spark-itemsimilarity scalability / Spark parallelism issues
> (SimilarityAnalysis.cooccurrencesIDSs)
>
> That looks like ancient code from the old mapreduce days. If is passes
> unit tests create a PR.
>
> Just a guess here but there are times when this might not speed up thing
> but slow them down. However for vey sparse matrixes that you might see in
> CF this could work quite well. Some of the GPU optimization will eventually
> be keyed off the density of a matrix, or selectable from knowing it’s
> characteristics.
>
> I use this code all the time and would be very interested in a version
> that works with CF style very sparse matrices.
>
> Long story short, create a PR so the optimizer guys can think through the
> implications. If I can also test it I have some large real-world data where
> I can test real-world speedup.
>
>
> On Aug 21, 2017, at 10:53 AM, Pat Ferrel <pat@occamsmachete.com> wrote:
>
> Interesting indeed. What is “massive”? Does the change pass all unit tests?
>
>
> On Aug 17, 2017, at 1:04 PM, Scruggs, Matt <matt.scruggs@bronto.com>
> wrote:
>
> Thanks for the remarks guys!
>
> I profiled the code running locally on my machine and discovered this loop
> is where these setQuick() and getQuick() calls originate (during matrix
> Kryo deserialization), and as you can see the complexity of this 2D loop
> can be very high:
>
> https://github.com/apache/mahout/blob/08e02602e947ff945b9bd73ab5f0b4
> 5863df3e53/math/src/main/java/org/apache/mahout/math/
> AbstractMatrix.java#L240
>
>
> Recall that this algorithm uses SparseRowMatrix whose rows are
> SequentialAccessSparseVector, so all this looping seems unnecessary. I
> created a new subclass of SparseRowMatrix that overrides that
> assign(matrix, function) method, and instead of looping through all the
> columns of each row, it calls SequentialAccessSparseVector.iterateNonZero()
> so it only has to touch the cells with values. I also had to customize
> MahoutKryoRegistrator a bit with a new default serializer for this new
> matrix class. This yielded a massive performance boost and I verified that
> the results match exactly for several test cases and datasets. I realize
> this could have side-effects in some cases, but I'm not using any other
> part of Mahout, only SimilaritAnalysis.cooccurrencesIDSs().
>
> Any thoughts / comments?
>
>
> Matt
>
>
>
> On 8/16/17, 8:29 PM, "Ted Dunning" <ted.dunning@gmail.com> wrote:
>
> > It is common with large numerical codes that things run faster in memory
> on
> > just a few cores if the communication required outweighs the parallel
> > speedup.
> >
> > The issue is that memory bandwidth is slower than the arithmetic speed
> by a
> > very good amount. If you just have to move stuff into the CPU and munch
> on
> > it a bit it is one thing, but if you have to move the data to CPU and
> back
> > to memory to distributed it around possibly multiple times, you may wind
> up
> > with something much slower than you would have had if you were to attack
> > the problem directly.
> >
> >
> >
> > On Wed, Aug 16, 2017 at 4:47 PM, Pat Ferrel <pat@occamsmachete.com>
> wrote:
> >
> >> This uses the Mahout blas optimizing solver, which I just use and do not
> >> know well. Mahout virtualizes some things having to do with partitioning
> >> and I’ve never quite understood how they work. There is a .par() on one
> of
> >> the matrix classes that has a similar function to partition but in all
> >> cases I’ve used .par(auto) and use normal spark repartitioning based on
> >> parallelism. Mahout implements a mapBlock function, which (all things
> being
> >> equal) looks at a partition at a time in memory. The reduce is not part
> of
> >> the code I wrote.
> >>
> >> The reduce is not part of the code I wrote. Maybe someone else can
> explain
> >> what blas is doing.
> >>
> >> BTW hashmap is O(log n) on average for large n—caveats apply. We use
> >> fastutils for many things (I thought this was one case) because they are
> >> faster than JVM implementations but feel free to dig in further. We use
> >> downsampling to maintain an overall rough O(n) calculation speed where
> n =
> >> # rows (users). As the model gets more dense there are greatly
> diminishing
> >> returns for the density so after the elements per row threshold is
> reached
> >> we don’t use more in the model creation math.
> >>
> >> Still feel free to dig into what the optimizer is doing.
> >>
> >>
> >> On Aug 15, 2017, at 11:13 AM, Scruggs, Matt <matt.scruggs@bronto.com>
> >> wrote:
> >>
> >> Thanks Pat, that's good to know!
> >>
> >> This is the "reduce" step (which gets its own stage in my Spark
> >> jobs...this stage takes almost all the runtime) where most of the work
> is
> >> being done, and takes longer the more shuffle partitions there are
> >> (relative to # of CPUs):
> >>
> >> https://github.com/apache/mahout/blob/08e02602e947ff945b9bd73ab5f0b4
> >> 5863df3e53/spark/src/main/scala/org/apache/mahout/
> >> sparkbindings/blas/AtA.scala#L258
> >>
> >>
> >> Why does the runtime of this reduce stage (that ultimately calls
> >> SequentialAccessSparseVector.getQuick() and setQuick() a lot) depend on
> >> the ratio of (# Spark CPUs / spark.sql.shuffle.partitions)? Essentially
> >> that ratio determines how many "chunks" of shuffle partition (reduce)
> tasks
> >> must run, and each of those chunks always takes the same amount of
> time, so
> >> the stage finishes in less time when that ratio is low (preferably 1).
> >>
> >> EXAMPLES - using 32 cores and 200 shuffle partitions, this stage
> requires
> >> ceil(145 tasks / 32 cores) = 5 "chunks" of work (145 tasks instead of
> 200
> >> because of the estimateProductPartitions call in AtA). Each chunk takes
> ~8
> >> minutes, so (5 chunks * 8 min) = ~40 mins. For a job with 32 cores and
> 32
> >> shuffle partitions (same CPU resources, still 100% utilized), this stage
> >> requires only ceil(23 tasks / 32 cores) = 1 chunk of work, which takes
> the
> >> same 8 minutes, so the job finishes ~5x faster. You can take this to the
> >> extreme with just 1 core and 1 shuffle partition, and the stage still
> takes
> >> the same amount of time! I'd love to know if you can reproduce this
> >> behavior.
> >>
> >> This goes against most advice and experience I've had with Spark, where
> >> you want to *increase* your partitioning in many cases (or at least
> leave
> >> it at the default 200, not lower it dramatically) to utilize CPUs better
> >> (and shrink each individual partition's task). There seems to be no
> >> reduction in computational complexity *per task* (within this stage I'm
> >> talking about) even with high values for spark.sql.shuffle.partitions
> (so
> >> it seems the data isn't actually being partitioned by the shuffle
> process).
> >> Refer back to the timings w/various configs in my first message.
> >>
> >> Also...is there a possibility of using a faster hash-based
> implementation
> >> instead of the setQuick() / getQuick() methods of
> >> SequentialAccessSparseVector? The javadoc on those methods mentions they
> >> shouldn't be used unless absolutely necessary due to their O(log n)
> >> complexity.
> >>
> >>
> >> Thanks for your time...this is fun stuff!
> >> Matt
> >>
> >>
> >>
> >> On 8/15/17, 10:15 AM, "Pat Ferrel" <pat@occamsmachete.com> wrote:
> >>
> >>> Great, this is the best way to use the APIs. The big win with CCO, the
> >> algo you are using is with multiple user actions. Be aware that when
> you go
> >> to this methods the input IndexedDatasets must be coerced to have
> >> compatible dimensionality, in this case the primary action defines the
> >> user-set used in calculating the model—not the one for making queries,
> >> which can use anonymous user  history. But that is for later and outside
> >> Mahout.
> >>>
> >>> 1) 4x max parallelism is a rule of thumb since the cores may not need
> >> 100% duty cycle, if they are already at 100% the 4x does no good. 2) you
> >> have found a long running task but there will always be one, if it
> weren’t
> >> this one it would be another. Different types of tasks use resources
> >> differently. For instance the collects, which must eventually use a the
> >> memory of the Driver to instantiate an in-memory data structure. There
> is
> >> no magic choice to make this work differently but it avoid several
> joins,
> >> which are much slower.
> >>>
> >>> I’m not quite sure what your question is.
> >>>
> >>>
> >>> On Aug 15, 2017, at 6:21 AM, Scruggs, Matt <matt.scruggs@bronto.com>
> >> wrote:
> >>>
> >>> Hi Pat,
> >>>
> >>> I've taken some screenshots of my Spark UI to hopefully shed some light
> >> on the behavior I'm seeing. Do you mind if I send you a link via direct
> >> email (would rather not post it here)? It's just a shared Dropbox
> folder.
> >>>
> >>>
> >>> Thanks,
> >>> Matt
> >>>
> >>>
> >>>
> >>> On 8/14/17, 11:34 PM, "Scruggs, Matt" <matt.scruggs@bronto.com> wrote:
> >>>
> >>>> I'm running a custom Scala app (distributed in a shaded jar) directly
> >> calling SimilarityAnalysis.cooccurrenceIDSs(), not using the CLI.
> >>>>
> >>>> The input data already gets explicitly repartitioned to
> spark.cores.max
> >> (defaultParallelism) in our code. I'll try increasing that by the
> factor of
> >> 4 that you suggest, but all our cores are already utilized so I'm not
> sure
> >> that will help. It gets bogged down in the post-shuffle (shuffle read /
> >> combine / reduce) phase even with all cores busy the whole time, which
> is
> >> why I've been playing around with various values for
> >> spark.sql.shuffle.partitions. The O(log n) operations I mentioned seem
> to
> >> take >95% of runtime.
> >>>>
> >>>> Thanks,
> >>>> Matt
> >>>> ________________________________
> >>>> From: Pat Ferrel <pat@occamsmachete.com>
> >>>> Sent: Monday, August 14, 2017 11:02:42 PM
> >>>> To: user@mahout.apache.org
> >>>> Subject: Re: spark-itemsimilarity scalability / Spark parallelism
> >> issues (SimilarityAnalysis.cooccurrencesIDSs)
> >>>>
> >>>> Are you using the CLI? If so it’s likely that there is only one
> >> partition of the data. If you use Mahout in the Spark shell or using it
> as
> >> a lib, do a repartition on the input data before passing it into
> >> SimilarityAnalysis.cooccurrencesIDSs. I repartition to 4*total cores to
> >> start with and set max parallelism for spark to the same. The CLI isn’t
> >> really production worthy, just for super easy experiments with CSVs.
> >>>>
> >>>>
> >>>> On Aug 14, 2017, at 2:31 PM, Scruggs, Matt <matt.scruggs@bronto.com>
> >> wrote:
> >>>>
> >>>> Howdy,
> >>>>
> >>>> I'm running SimilarityAnalysis.cooccurrencesIDSs on a fairly small
> >> dataset (about 870k [user, item] rows in the primary action IDS…no cross
> >> co-occurrence IDS) and I noticed it scales strangely. This is with
> Mahout
> >> 0.13.0 although the same behavior happens in 0.12.x as well (haven't
> tested
> >> it before that).
> >>>>
> >>>> TLDR - regardless of the Spark parallelism (CPUs) I throw at this
> >> routine, every Spark task within the final / busy stage seems to take
> the
> >> same amount of time, which leads me to guess that every shuffle
> partition
> >> contains the same amount of data (perhaps the full dataset matrix in
> >> shape/size, albeit with different values). I'm reaching out to see if
> this
> >> is a known algorithmic complexity issue in this routine, or if my
> config is
> >> to blame (or both).
> >>>>
> >>>> Regarding our hardware, we have identical physical machines in a Mesos
> >> cluster with 6 workers and a few masters. Each worker has ~500GB of
> SSD, 32
> >> cores and 128g RAM. We run lots of Spark jobs and have generally ironed
> out
> >> the kinks in terms of hardware and cluster config, so I don't suspect
> any
> >> hardware-related issues.
> >>>>
> >>>> Here are some timings for SimilarityAnalysis.cooccurrencesIDSs on
> this
> >> dataset with maxNumInteractions = 500, maxInterestingItemsPerThing = 20,
> >> randomSeed = default, parOpts = default (there's lots of other Spark
> >> config, this is just what I'm varying to check for effects). In
> particular,
> >> notice how the ratio of (spark.sql.shuffle.partitions / spark.cores.max)
> >> affects the runtime:
> >>>>
> >>>> * 8 executors w/8 cores each, takes about 45 minutes
> >>>> * note that spark.sql.shuffle.partitions > spark.cores.max
> >>>> spark.cores.max = 64
> >>>> spark.executor.cores = 8
> >>>> spark.sql.shuffle.partitions = 200 (default)
> >>>>
> >>>> * 1 executors w/24 cores, takes about 65 minutes
> >>>> * note that spark.sql.shuffle.partitions >>> spark.cores.max
> >>>> spark.cores.max = 24
> >>>> spark.executor.cores = 24
> >>>> spark.sql.shuffle.partitions = 200 (default)
> >>>>
> >>>> * 1 executor w/8 cores, takes about 8 minutes
> >>>> * note that spark.sql.shuffle.partitions = spark.cores.max
> >>>> spark.cores.max = 8
> >>>> spark.executor.cores = 8 (1 executor w/8 cores)
> >>>> spark.sql.shuffle.partitions = 8
> >>>>
> >>>> * 1 executor w/24 cores, takes about 8 minutes (same as 8 cores!)
> >>>> * note that spark.sql.shuffle.partitions = spark.cores.max
> >>>> spark.cores.max = 24
> >>>> spark.executor.cores = 24 (1 executor w/24 cores)
> >>>> spark.sql.shuffle.partitions = 24
> >>>>
> >>>> * 32 executors w/2 cores each, takes about 8 minutes (same as 8
> cores!)
> >>>> * note that spark.sql.shuffle.partitions = spark.cores.max
> >>>> spark.cores.max = 64
> >>>> spark.executor.cores = 2
> >>>> spark.sql.shuffle.partitions = 88 (results in 64 tasks for final
> stage)
> >>>>
> >>>> Adjusting the "maxNumInteractions" parameter down to 100 and 50
> results
> >> in a minor improvement (5-10%). I've also played around with removing
> >> [user, item] rows from the input dataset for users with only 1
> >> interaction…I read to try that in another thread…that yielded maybe a
> >> 40-50% speed improvement, but I'd rather not toss out data (unless it
> truly
> >> is totally useless, of course :D ).
> >>>>
> >>>> When I look at the thread dump within the Spark UI's Executors ->
> >> thread dump pages, it seems all the executors are very busy in the code
> >> pasted below for >95% of the run. GC throughput is very good so we're
> not
> >> bogged down there...it's just super busy doing running the code below.
> I am
> >> intrigued about the comments on the SequentialAccessSparseVector
> methods I
> >> see being called (getQuick and setQuick), which state they take O(log n)
> >> time (https://github.com/apache/mahout/blob/
> 08e02602e947ff945b9bd73ab5f0b4
> >> 5863df3e53/math/src/main/java/org/apache/mahout/math/
> >> SequentialAccessSparseVector.java).
> >>>>
> >>>>
> >>>> Thanks all for your time and feedback!
> >>>>
> >>>> Matt Scruggs
> >>>>
> >>>> org.apache.mahout.math.OrderedIntDoubleMapping.find(
> >> OrderedIntDoubleMapping.java:105)
> >>>> org.apache.mahout.math.OrderedIntDoubleMapping.get(
> >> OrderedIntDoubleMapping.java:110)
> >>>> org.apache.mahout.math.SequentialAccessSparseVector.getQuick(
> >> SequentialAccessSparseVector.java:157)
> >>>> org.apache.mahout.math.SparseRowMatrix.getQuick(
> >> SparseRowMatrix.java:90)
> >>>> org.apache.mahout.math.AbstractMatrix.assign(AbstractMatrix.java:240)
> >>>> org.apache.mahout.math.scalabindings.MatrixOps.$plus$
> >> eq(MatrixOps.scala:45)
> >>>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:
> >> 258)
> >>>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:
> >> 258)
> >>>> org.apache.spark.util.collection.ExternalAppendOnlyMap$$
> >> anonfun$3.apply(ExternalAppendOnlyMap.scala:151)
> >>>> org.apache.spark.util.collection.ExternalAppendOnlyMap$$
> >> anonfun$3.apply(ExternalAppendOnlyMap.scala:150)
> >>>> org.apache.spark.util.collection.AppendOnlyMap.
> >> changeValue(AppendOnlyMap.scala:144)
> >>>> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.
> changeValue(
> >> SizeTrackingAppendOnlyMap.scala:32)
> >>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(
> >> ExternalAppendOnlyMap.scala:163)
> >>>> org.apache.spark.Aggregator.combineCombinersByKey(
> Aggregator.scala:50)
> >>>> org.apache.spark.shuffle.BlockStoreShuffleReader.read(
> >> BlockStoreShuffleReader.scala:85)
> >>>> org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
> >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
> >> MapPartitionsRDD.scala:38)
> >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
> >> MapPartitionsRDD.scala:38)
> >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
> >> MapPartitionsRDD.scala:38)
> >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
> >> MapPartitionsRDD.scala:38)
> >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
> >> MapPartitionsRDD.scala:38)
> >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
> >> MapPartitionsRDD.scala:38)
> >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
> >> MapPartitionsRDD.scala:38)
> >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
> >> MapPartitionsRDD.scala:38)
> >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(
> >> ShuffleMapTask.scala:79)
> >>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(
> >> ShuffleMapTask.scala:47)
> >>>> org.apache.spark.scheduler.Task.run(Task.scala:86)
> >>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(
> >> ThreadPoolExecutor.java:1142)
> >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(
> >> ThreadPoolExecutor.java:617)
> >>>> java.lang.Thread.run(Thread.java:745)
> >>>>
> >>>> ……or this code……
> >>>>
> >>>> org.apache.mahout.math.SparseRowMatrix.setQuick(
> >> SparseRowMatrix.java:105)
> >>>> org.apache.mahout.math.AbstractMatrix.assign(AbstractMatrix.java:240)
> >>>> org.apache.mahout.math.scalabindings.MatrixOps.$plus$
> >> eq(MatrixOps.scala:45)
> >>>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:
> >> 258)
> >>>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:
> >> 258)
> >>>> org.apache.spark.util.collection.ExternalAppendOnlyMap$$
> >> anonfun$3.apply(ExternalAppendOnlyMap.scala:151)
> >>>> org.apache.spark.util.collection.ExternalAppendOnlyMap$$
> >> anonfun$3.apply(ExternalAppendOnlyMap.scala:150)
> >>>> org.apache.spark.util.collection.AppendOnlyMap.
> >> changeValue(AppendOnlyMap.scala:144)
> >>>> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.
> changeValue(
> >> SizeTrackingAppendOnlyMap.scala:32)
> >>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(
> >> ExternalAppendOnlyMap.scala:163)
> >>>> org.apache.spark.Aggregator.combineCombinersByKey(
> Aggregator.scala:50)
> >>>> org.apache.spark.shuffle.BlockStoreShuffleReader.read(
> >> BlockStoreShuffleReader.scala:85)
> >>>> org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
> >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
> >> MapPartitionsRDD.scala:38)
> >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
> >> MapPartitionsRDD.scala:38)
> >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
> >> MapPartitionsRDD.scala:38)
> >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
> >> MapPartitionsRDD.scala:38)
> >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
> >> MapPartitionsRDD.scala:38)
> >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
> >> MapPartitionsRDD.scala:38)
> >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
> >> MapPartitionsRDD.scala:38)
> >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
> >> MapPartitionsRDD.scala:38)
> >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(
> >> ShuffleMapTask.scala:79)
> >>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(
> >> ShuffleMapTask.scala:47)
> >>>> org.apache.spark.scheduler.Task.run(Task.scala:86)
> >>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(
> >> ThreadPoolExecutor.java:1142)
> >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(
> >> ThreadPoolExecutor.java:617)
> >>>> java.lang.Thread.run(Thread.java:745)
> >>>>
> >>>
> >>
> >>
>
>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message