crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrey Gusev <and...@siftscience.com>
Subject Secondary sort and partitioning in Spark
Date Thu, 10 Dec 2015 02:05:15 GMT
Hello crunch!

I am running into problems with partitioning of groups with secondary sort
running on SparkPipeline.

What I am observing is that records belonging to a single group may be
split across two or more calls to apply DoFn. This could be a gap in my
understanding of Spark execution model wrt to locality - and if so, can
*all* the records belonging to a groupBy key be forced to a single call?

Roughly speaking the code looks like this:

PTableType<GroupByKey, Pair<SortKey, Info>> pType =
tableOf(Writables.writables(GroupByKey.class),
Writables.pairs(Writables.writables(SortKey.class),
Writables.writables(Info.class)));

// note that dataset has been explicitly sharded by numPartitions
PTable< GroupByKey, Pair< SortKey, Info >> infos = dataset.parallelDo(...,
pType);

PTable< SortKey, Info > mergedInfos =
SecondarySort.sortAndApply(infos, mergeInfos(...),
mergeType, numPartitions);

static class GroupByKey implements Writable {

public int treeId;
public int nodeId;
...
}

I can confirm that records come in sorted and grouped but I am also
observing that a single group may be executed on at different nodes. More
concretely lets say group belonging to treeId=0, nodeId=0 has 100 records,
the first 30 may show up on node1, and the remaining on node2 (in both
cases sorted). Informally it does look like it basically ensures that each
node is scheduled to process the same number of records. It's especially
evident with 2 partition where exactly one group is split.

The semantics of the code (at least for now) require all the values to come
in with a single group. Can that be forced?

env: spark 1.5 and crunch 0.11.0

Any thoughts would be appreciated!

Mime
View raw message