crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrey Gusev <and...@siftscience.com>
Subject Re: Secondary sort and partitioning in Spark
Date Thu, 10 Dec 2015 20:23:09 GMT
Yeah, so that appears to be the symptom. I did try both 0.13 and just built
0.14 from source - but the problem is present in both cases.

When I use 2 partitions - and say 100 total groups, I see the first
partition starting at group 0 and going through groups to group 47, sort
field index 344

roughly the log line

groupId:47, sortFieldIndex: 344


the next partition executing on different node starts at:

groupId: 47, sortFieldIndex: 345


So group is broken up

I do see that PartitionedMapOutputFunction is used in PGroupedTableImpl
(spark) so it looks like CRUNCH-556
<https://issues.apache.org/jira/browse/CRUNCH-556> isn't sufficient.





On Wed, Dec 9, 2015 at 8:56 PM, Josh Wills <josh.wills@gmail.com> wrote:

> Hrm-- so you're saying records for the same GroupByKey are ending up in
> different partitions when you're doing a secondary sort? Sounds like a bug
> in the SparkPartitioner we're using-- I wonder if it was the same bug that
> was fixed here?
>
> https://issues.apache.org/jira/browse/CRUNCH-556
>
> On Wed, Dec 9, 2015 at 6:05 PM, Andrey Gusev <andrey@siftscience.com>
> wrote:
>
>> Hello crunch!
>>
>> I am running into problems with partitioning of groups with secondary
>> sort running on SparkPipeline.
>>
>> What I am observing is that records belonging to a single group may be
>> split across two or more calls to apply DoFn. This could be a gap in my
>> understanding of Spark execution model wrt to locality - and if so, can
>> *all* the records belonging to a groupBy key be forced to a single call?
>>
>> Roughly speaking the code looks like this:
>>
>> PTableType<GroupByKey, Pair<SortKey, Info>> pType =
>> tableOf(Writables.writables(GroupByKey.class),
>> Writables.pairs(Writables.writables(SortKey.class),
>> Writables.writables(Info.class)));
>>
>> // note that dataset has been explicitly sharded by numPartitions
>> PTable< GroupByKey, Pair< SortKey, Info >> infos =
>> dataset.parallelDo(..., pType);
>>
>> PTable< SortKey, Info > mergedInfos =
>> SecondarySort.sortAndApply(infos, mergeInfos(...),
>> mergeType, numPartitions);
>>
>> static class GroupByKey implements Writable {
>>
>> public int treeId;
>> public int nodeId;
>> ...
>> }
>>
>> I can confirm that records come in sorted and grouped but I am also
>> observing that a single group may be executed on at different nodes. More
>> concretely lets say group belonging to treeId=0, nodeId=0 has 100 records,
>> the first 30 may show up on node1, and the remaining on node2 (in both
>> cases sorted). Informally it does look like it basically ensures that each
>> node is scheduled to process the same number of records. It's especially
>> evident with 2 partition where exactly one group is split.
>>
>> The semantics of the code (at least for now) require all the values to
>> come in with a single group. Can that be forced?
>>
>> env: spark 1.5 and crunch 0.11.0
>>
>> Any thoughts would be appreciated!
>>
>
>

Mime
View raw message