Return-Path: X-Original-To: apmail-crunch-user-archive@www.apache.org Delivered-To: apmail-crunch-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8E08C18FA5 for ; Thu, 10 Dec 2015 04:56:37 +0000 (UTC) Received: (qmail 29417 invoked by uid 500); 10 Dec 2015 04:56:37 -0000 Delivered-To: apmail-crunch-user-archive@crunch.apache.org Received: (qmail 29356 invoked by uid 500); 10 Dec 2015 04:56:36 -0000 Mailing-List: contact user-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@crunch.apache.org Delivered-To: mailing list user@crunch.apache.org Received: (qmail 29346 invoked by uid 99); 10 Dec 2015 04:56:36 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 10 Dec 2015 04:56:36 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 6E0E71A5B5C for ; Thu, 10 Dec 2015 04:56:36 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.899 X-Spam-Level: ** X-Spam-Status: No, score=2.899 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id Ncf6oc1QG0Ac for ; Thu, 10 Dec 2015 04:56:32 +0000 (UTC) Received: from mail-qg0-f51.google.com (mail-qg0-f51.google.com [209.85.192.51]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id AC26F429CB for ; Thu, 10 Dec 2015 04:56:31 +0000 (UTC) Received: by qgcc31 with SMTP id c31so121259093qgc.3 for ; Wed, 09 Dec 2015 20:56:31 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :content-type; bh=i/VjntY/IYBkZV5Yg9M+NbaonSPNvxxEMteRSoDdWMc=; b=Gh9sCpRdxHP1EF5nSISMnvD3pZfQvhM6bAiQU8fDl57JrKIkY3mt20fsM/OkxqY6jO 93lMqAkJagm9a5OSLmPJZTYPUeIbCUWrGW08nv33P/rJi6P3k/VwwkdpdM34NWVdR6M4 6KUGNJgULTCZllKDizaKH2MoqAKDe/r6XPbvxKPyLmiz4T1RPZKEutqotfb/OU892hXh lvwQMVFuJEYU/UfaNyLjDT1pQyfZME3TUap2tm4szTw4fhsmZWxs6JEgdwaU2ekpE6Qk VnGg46PZb+i9ahjv7GuyroaK+qquQIMU97/IoskIH5+jAqo/B/g57DtQzNd35pDqed6i r8xA== X-Received: by 10.13.209.199 with SMTP id t190mr3557046ywd.265.1449723391213; Wed, 09 Dec 2015 20:56:31 -0800 (PST) MIME-Version: 1.0 Received: by 10.37.214.12 with HTTP; Wed, 9 Dec 2015 20:56:11 -0800 (PST) In-Reply-To: References: From: Josh Wills Date: Wed, 9 Dec 2015 20:56:11 -0800 Message-ID: Subject: Re: Secondary sort and partitioning in Spark To: "user@crunch.apache.org" Content-Type: multipart/alternative; boundary=001a114e48c0c1892d05268407ad --001a114e48c0c1892d05268407ad Content-Type: text/plain; charset=UTF-8 Hrm-- so you're saying records for the same GroupByKey are ending up in different partitions when you're doing a secondary sort? Sounds like a bug in the SparkPartitioner we're using-- I wonder if it was the same bug that was fixed here? https://issues.apache.org/jira/browse/CRUNCH-556 On Wed, Dec 9, 2015 at 6:05 PM, Andrey Gusev wrote: > Hello crunch! > > I am running into problems with partitioning of groups with secondary sort > running on SparkPipeline. > > What I am observing is that records belonging to a single group may be > split across two or more calls to apply DoFn. This could be a gap in my > understanding of Spark execution model wrt to locality - and if so, can > *all* the records belonging to a groupBy key be forced to a single call? > > Roughly speaking the code looks like this: > > PTableType> pType = > tableOf(Writables.writables(GroupByKey.class), > Writables.pairs(Writables.writables(SortKey.class), > Writables.writables(Info.class))); > > // note that dataset has been explicitly sharded by numPartitions > PTable< GroupByKey, Pair< SortKey, Info >> infos = dataset.parallelDo(..., > pType); > > PTable< SortKey, Info > mergedInfos = > SecondarySort.sortAndApply(infos, mergeInfos(...), > mergeType, numPartitions); > > static class GroupByKey implements Writable { > > public int treeId; > public int nodeId; > ... > } > > I can confirm that records come in sorted and grouped but I am also > observing that a single group may be executed on at different nodes. More > concretely lets say group belonging to treeId=0, nodeId=0 has 100 records, > the first 30 may show up on node1, and the remaining on node2 (in both > cases sorted). Informally it does look like it basically ensures that each > node is scheduled to process the same number of records. It's especially > evident with 2 partition where exactly one group is split. > > The semantics of the code (at least for now) require all the values to > come in with a single group. Can that be forced? > > env: spark 1.5 and crunch 0.11.0 > > Any thoughts would be appreciated! > --001a114e48c0c1892d05268407ad Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hrm-- so you're saying records for the same GroupByKey= are ending up in different partitions when you're doing a secondary so= rt? Sounds like a bug in the SparkPartitioner we're using-- I wonder if= it was the same bug that was fixed here?


On Wed, Dec 9, 2015 at 6:05 PM, Andrey Gusev <= andrey@siftscience.com> wrote:
Hello crunch!

I a= m running into problems with partitioning of groups with secondary sort run= ning on SparkPipeline.=C2=A0

What I am observing is that records belon= ging to a single group may be split across two or more calls to apply DoFn.= This could be a gap in my understanding of Spark execution model wrt to lo= cality - and if so, can *all* the records belonging to a groupBy key be for= ced to a single call?

Roughly speaking the code looks like this:
<= div style=3D"font-size:12.8px">
P= TableType<GroupByKey, Pair<SortKey, Info>> pType =3D
tableOf(Writabl= es.writables(GroupByKey.class),
Writable= s.pairs(Writables.writables(SortKey= .class),
Writables.writables(Info.class)= ));

// note that dataset has been explicitly sharded by numPartitions<= /div>PTable<=C2=A0GroupByKey, Pair<= =C2=A0SortKey,=C2=A0Info=C2=A0>> infos =3D dataset.parallelDo(..., pT= ype);

PTable<=C2=A0SortKey,=C2=A0Info=C2=A0> mergedInfos =3D
Seconda= rySort.sortAndApply(infos, mergeInf= os(...), mergeType,=C2=A0numPartitions);

static class GroupByKey impl= ements Writable {

public int treeId;
public int nodeId= ;
...
}

I can confirm that records come in sorted and grouped but I am = also observing that a single group may be executed on at different nodes. M= ore concretely lets say group belonging to treeId=3D0, nodeId=3D0 has 100 r= ecords, the first 30 may show up on node1, and the remaining on node2 (in b= oth cases sorted). Informally it does look like it basically ensures that e= ach node is scheduled to process the same number of records. It's espec= ially evident with 2 partition where exactly one group is split.

The se= mantics of the code (at least for now) require all the values to come in wi= th a single group. Can that be forced?

= env: spark 1.5 and crunch 0.11.0
Any = thoughts would be appreciated!

--001a114e48c0c1892d05268407ad--