crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <josh.wi...@gmail.com>
Subject Re: Secondary sort and partitioning in Spark
Date Thu, 10 Dec 2015 04:56:11 GMT
Hrm-- so you're saying records for the same GroupByKey are ending up in
different partitions when you're doing a secondary sort? Sounds like a bug
in the SparkPartitioner we're using-- I wonder if it was the same bug that
was fixed here?

https://issues.apache.org/jira/browse/CRUNCH-556

On Wed, Dec 9, 2015 at 6:05 PM, Andrey Gusev <andrey@siftscience.com> wrote:

> Hello crunch!
>
> I am running into problems with partitioning of groups with secondary sort
> running on SparkPipeline.
>
> What I am observing is that records belonging to a single group may be
> split across two or more calls to apply DoFn. This could be a gap in my
> understanding of Spark execution model wrt to locality - and if so, can
> *all* the records belonging to a groupBy key be forced to a single call?
>
> Roughly speaking the code looks like this:
>
> PTableType<GroupByKey, Pair<SortKey, Info>> pType =
> tableOf(Writables.writables(GroupByKey.class),
> Writables.pairs(Writables.writables(SortKey.class),
> Writables.writables(Info.class)));
>
> // note that dataset has been explicitly sharded by numPartitions
> PTable< GroupByKey, Pair< SortKey, Info >> infos = dataset.parallelDo(...,
> pType);
>
> PTable< SortKey, Info > mergedInfos =
> SecondarySort.sortAndApply(infos, mergeInfos(...),
> mergeType, numPartitions);
>
> static class GroupByKey implements Writable {
>
> public int treeId;
> public int nodeId;
> ...
> }
>
> I can confirm that records come in sorted and grouped but I am also
> observing that a single group may be executed on at different nodes. More
> concretely lets say group belonging to treeId=0, nodeId=0 has 100 records,
> the first 30 may show up on node1, and the remaining on node2 (in both
> cases sorted). Informally it does look like it basically ensures that each
> node is scheduled to process the same number of records. It's especially
> evident with 2 partition where exactly one group is split.
>
> The semantics of the code (at least for now) require all the values to
> come in with a single group. Can that be forced?
>
> env: spark 1.5 and crunch 0.11.0
>
> Any thoughts would be appreciated!
>

Mime
View raw message