David, Tom,

Thanks for the explanation. This confirms my suspicion that the executor was holding on to memory  regardless of tasks in execution once it expands to occupy memory in keeping with spark.executor.memory. There certainly is scope for improvement here, though I realize there will substantial overheads in implementing memory release without compromising RDD caching and similar aspects. I'll explore alternatives / workarounds meanwhile.

Thanks,
Bharath


On Sat, Oct 17, 2015 at 3:33 PM, Tom Arnfeld <tom@duedil.com> wrote:
Hi Bharath,

When running jobs in fine grained mode, each Spark task is sent to mesos as a task which allows the offers system to maintain fairness between different spark application (as you've described). Having said that, unless your memory per-node is hugely undersubscribed when running these jobs in parallel. This behaviour matches exactly what you've described.

What you're seeing happens because even though there's a new mesos task for each Spark task (allowing CPU to be shared) the Spark executors don't get killed even when they aren't doing any work, which means the memory isn't released. The JVM doesn't allow for flexible memory re-allocation (as far as i'm aware) which make it impossible for spark to dynamically scale up the memory of the executor over time as tasks start and finish.

As Dave pointed out, the simplest way to solve this is to use a higher level tool that can run your spark jobs through one mesos framework and then you can let spark distribute the resources more effectively.

I hope that helps!

Tom.

On 17 Oct 2015, at 06:47, Bharath Ravi Kumar <reachbach@gmail.com> wrote:

Can someone respond if you're aware of the reason for such a memory footprint? It seems unintuitive and hard to reason about.

Thanks,
Bharath

On Thu, Oct 15, 2015 at 12:29 PM, Bharath Ravi Kumar <reachbach@gmail.com> wrote:
Resending since user@mesos bounced earlier. My apologies.

On Thu, Oct 15, 2015 at 12:19 PM, Bharath Ravi Kumar <reachbach@gmail.com> wrote:
(Reviving this thread since I ran into similar issues...)

I'm running two spark jobs (in mesos fine grained mode), each belonging to a different mesos role, say low and high. The low:high mesos weights are 1:10. On expected lines, I see that the low priority job occupies cluster resources to the maximum extent when running alone. However, when the high priority job is submitted, it does not start and continues to await cluster resources (as seen in the logs). Since the jobs run in fine grained mode and the low priority tasks begin to finish, the high priority job should ideally be able to start and gradually take over cluster resources as per the weights. However, I noticed that while the "low" job gives up CPU cores with each completing task (e.g. reduction from 72 -> 12 with default parallelism set to 72), the memory resources are held on (~500G out of 768G). The spark.executor.memory setting appears to directly impact the amount of memory that the job holds on to. In this case, it was set to 200G in the low priority task and 100G in the high priority task. The nature of these jobs is such that setting the numbers to smaller values (say 32g) resulted in job failures with outofmemoryerror.  It appears that the spark framework is retaining memory (across tasks)  proportional to spark.executor.memory for the duration of the job and not releasing memory as tasks complete. This defeats the purpose of fine grained mode execution as the memory occupancy is preventing the high priority job from accepting the prioritized cpu offers and beginning execution. Can this be explained / documented better please?

Thanks,
Bharath

On Sat, Apr 11, 2015 at 10:59 PM, Tim Chen <tim@mesosphere.io> wrote:
(Adding spark user list)

Hi Tom,

If I understand correctly you're saying that you're running into memory problems because the scheduler is allocating too much CPUs and not enough memory to acoomodate them right?

In the case of fine grain mode I don't think that's a problem since we have a fixed amount of CPU and memory per task. 
However, in coarse grain you can run into that problem if you're with in the spark.cores.max limit, and memory is a fixed number.

I have a patch out to configure how much max cpus should coarse grain executor use, and it also allows multiple executors in coarse grain mode. So you could say try to launch multiples of max 4 cores with spark.executor.memory (+ overhead and etc) in a slave. (https://github.com/apache/spark/pull/4027)

It also might be interesting to include a cores to memory multiplier so that with a larger amount of cores we try to scale the memory with some factor, but I'm not entirely sure that's intuitive to use and what people know what to set it to, as that can likely change with different workload.

Tim







On Sat, Apr 11, 2015 at 9:51 AM, Tom Arnfeld <tom@duedil.com> wrote:
We're running Spark 1.3.0 (with a couple of patches over the top for docker related bits).

I don't think SPARK-4158 is related to what we're seeing, things do run fine on the cluster, given a ridiculously large executor memory configuration. As for SPARK-3535 although that looks useful I think we'e seeing something else.

Put a different way, the amount of memory required at any given time by the spark JVM process is directly proportional to the amount of CPU it has, because more CPU means more tasks and more tasks means more memory. Even if we're using coarse mode, the amount of executor memory should be proportionate to the amount of CPUs in the offer.

On 11 April 2015 at 17:39, Brenden Matthews <brenden@diddyinc.com> wrote:
I ran into some issues with it a while ago, and submitted a couple PRs to fix it:


Do these look relevant? What version of Spark are you running?

On Sat, Apr 11, 2015 at 9:33 AM, Tom Arnfeld <tom@duedil.com> wrote:

Hey,

Not sure whether it's best to ask this on the spark mailing list or the mesos one, so I'll try here first :-)

I'm having a bit of trouble with out of memory errors in my spark jobs... it seems fairly odd to me that memory resources can only be set at the executor level, and not also at the task level. For example, as far as I can tell there's only a spark.executor.memory config option.

Surely the memory requirements of a single executor are quite dramatically influenced by the number of concurrent tasks running? Given a shared cluster, I have no idea what % of an individual slave my executor is going to get, so I basically have to set the executor memory to a value that's correct when the whole machine is in use...

Has anyone else running Spark on Mesos come across this, or maybe someone could correct my understanding of the config options?

Thanks!

Tom.