spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aaron Davidson <ilike...@gmail.com>
Subject Re: data locality, task distribution
Date Thu, 13 Nov 2014 18:20:52 GMT
You mentioned that the 3.1 min run was the one that did the actual caching,
so did that run before any data was cached, or after?

I would recommend checking the Storage tab of the UI, and clicking on the
RDD, to see both how full the executors' storage memory is (which may be
significantly less than the instance's memory). When a task completes over
data that should be cached, it will try to cache it, so it's pretty weird
that you're seeing <100% cache with memory to spare. It's possible that
some partitions are significantly larger than others, which may cause us to
not attempt to cache it (defined by spark.storage.unrollFraction).

You can also try increasing the spark.locality.wait flag to ensure that
Spark will wait longer for tasks to complete before running them
non-locally. One possible situation is that a node hits GC for a few
seconds, a task is scheduled non-locally, and then attempting to read from
the other executors' cache is much more expensive than computing the data
initially. Increasing the locality wait beyond the GC time would avoid this.

On Thu, Nov 13, 2014 at 9:08 AM, Nathan Kronenfeld <
nkronenfeld@oculusinfo.com> wrote:

> I am seeing skewed execution times.  As far as I can tell, they are
> attributable to differences in data locality - tasks with locality
> PROCESS_LOCAL run fast, NODE_LOCAL, slower, and ANY, slowest.
>
> This seems entirely as it should be - the question is, why the different
> locality levels?
>
> I am seeing skewed caching, as I mentioned before - in the case I
> isolated, with 4 nodes, they were distributed at about 42%, 31%, 20%, and
> 6%.  However, the total amount was significantly less than the memory of
> any single node, so I don't think they could have overpopulated their
> cache.  I am occasionally seeing task failures, but the re-execute
> themselves, and work fine the next time.  Yet I'm still seeing incomplete
> caching (from 65% cached up to 100%, depending on the run).
>
> I shouldn't have much variance in task time - this is simply a foreach
> over the data, adding to an accumulator, and the data is completely
> randomly distributed, so should be pretty even overall.
>
> I am seeing GC regressions occasionally - they slow a request from about 2
> seconds to about 5 seconds.  They 8 minute slowdown seems to be solely
> attributable to the data locality issue, as far as I can tell.  There was
> some further confusion though in the times I mentioned - the list I gave
> (3.1 min, 2 seconds, ... 8 min) were not different runs with different
> cache %s, they were iterations within a single run with 100% caching.
>
>                        -Nathan
>
>
>
> On Thu, Nov 13, 2014 at 1:45 AM, Aaron Davidson <ilikerps@gmail.com>
> wrote:
>
>> Spark's scheduling is pretty simple: it will allocate tasks to open cores
>> on executors, preferring ones where the data is local. It even performs
>> "delay scheduling", which means waiting a bit to see if an executor where
>> the data resides locally becomes available.
>>
>> Are yours tasks seeing very skewed execution times? If some tasks are
>> taking a very long time and using all the resources on a node, perhaps the
>> other nodes are quickly finishing many tasks, and actually overpopulating
>> their caches. If a particular machine were not overpopulating its cache,
>> and there are no failures, then you should see 100% cached after the first
>> run.
>>
>> It's also strange that running totally uncached takes 3.1 minutes, but
>> running 80-90% cached may take 8 minutes. Does your workload produce
>> nondeterministic variance in task times? Was it a single straggler, or many
>> tasks, that was keeping the job from finishing? It's not too uncommon to
>> see occasional performance regressions while caching due to GC, though 2
>> seconds to 8 minutes is a bit extreme.
>>
>> On Wed, Nov 12, 2014 at 9:01 PM, Nathan Kronenfeld <
>> nkronenfeld@oculusinfo.com> wrote:
>>
>>> Sorry, I think I was not clear in what I meant.
>>> I didn't mean it went down within a run, with the same instance.
>>>
>>> I meant I'd run the whole app, and one time, it would cache 100%, and
>>> the next run, it might cache only 83%
>>>
>>> Within a run, it doesn't change.
>>>
>>> On Wed, Nov 12, 2014 at 11:31 PM, Aaron Davidson <ilikerps@gmail.com>
>>> wrote:
>>>
>>>> The fact that the caching percentage went down is highly suspicious. It
>>>> should generally not decrease unless other cached data took its place, or
>>>> if unless executors were dying. Do you know if either of these were the
>>>> case?
>>>>
>>>> On Tue, Nov 11, 2014 at 8:58 AM, Nathan Kronenfeld <
>>>> nkronenfeld@oculusinfo.com> wrote:
>>>>
>>>>> Can anyone point me to a good primer on how spark decides where to
>>>>> send what task, how it distributes them, and how it determines data
>>>>> locality?
>>>>>
>>>>> I'm trying a pretty simple task - it's doing a foreach over cached
>>>>> data, accumulating some (relatively complex) values.
>>>>>
>>>>> So I see several inconsistencies I don't understand:
>>>>>
>>>>> (1) If I run it a couple times, as separate applications (i.e.,
>>>>> reloading, recaching, etc), I will get different %'s cached each time.
>>>>> I've got about 5x as much memory as I need overall, so it isn't running
>>>>> out.  But one time, 100% of the data will be cached; the next, 83%, the
>>>>> next, 92%, etc.
>>>>>
>>>>> (2) Also, the data is very unevenly distributed. I've got 400
>>>>> partitions, and 4 workers (with, I believe, 3x replication), and on my
last
>>>>> run, my distribution was 165/139/25/71.  Is there any way to get spark
to
>>>>> distribute the tasks more evenly?
>>>>>
>>>>> (3) If I run the problem several times in the same execution (to take
>>>>> advantage of caching etc.), I get very inconsistent results.  My latest
>>>>> try, I get:
>>>>>
>>>>>    - 1st run: 3.1 min
>>>>>    - 2nd run: 2 seconds
>>>>>    - 3rd run: 8 minutes
>>>>>    - 4th run: 2 seconds
>>>>>    - 5th run: 2 seconds
>>>>>    - 6th run: 6.9 minutes
>>>>>    - 7th run: 2 seconds
>>>>>    - 8th run: 2 seconds
>>>>>    - 9th run: 3.9 minuts
>>>>>    - 10th run: 8 seconds
>>>>>
>>>>> I understand the difference for the first run; it was caching that
>>>>> time.  Later times, when it manages to work in 2 seconds, it's because
all
>>>>> the tasks were PROCESS_LOCAL; when it takes longer, the last 10-20% of
the
>>>>> tasks end up with locality level ANY.  Why would that change when running
>>>>> the exact same task twice in a row on cached data?
>>>>>
>>>>> Any help or pointers that I could get would be much appreciated.
>>>>>
>>>>>
>>>>> Thanks,
>>>>>
>>>>>                  -Nathan
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Nathan Kronenfeld
>>>>> Senior Visualization Developer
>>>>> Oculus Info Inc
>>>>> 2 Berkeley Street, Suite 600,
>>>>> Toronto, Ontario M5A 4J5
>>>>> Phone:  +1-416-203-3003 x 238
>>>>> Email:  nkronenfeld@oculusinfo.com
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Nathan Kronenfeld
>>> Senior Visualization Developer
>>> Oculus Info Inc
>>> 2 Berkeley Street, Suite 600,
>>> Toronto, Ontario M5A 4J5
>>> Phone:  +1-416-203-3003 x 238
>>> Email:  nkronenfeld@oculusinfo.com
>>>
>>
>>
>
>
> --
> Nathan Kronenfeld
> Senior Visualization Developer
> Oculus Info Inc
> 2 Berkeley Street, Suite 600,
> Toronto, Ontario M5A 4J5
> Phone:  +1-416-203-3003 x 238
> Email:  nkronenfeld@oculusinfo.com
>

Mime
View raw message