Return-Path: X-Original-To: apmail-spark-user-archive@minotaur.apache.org Delivered-To: apmail-spark-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 33C6818BAB for ; Wed, 14 Oct 2015 17:11:43 +0000 (UTC) Received: (qmail 64611 invoked by uid 500); 14 Oct 2015 17:11:36 -0000 Delivered-To: apmail-spark-user-archive@spark.apache.org Received: (qmail 64513 invoked by uid 500); 14 Oct 2015 17:11:36 -0000 Mailing-List: contact user-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@spark.apache.org Received: (qmail 64503 invoked by uid 99); 14 Oct 2015 17:11:36 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Oct 2015 17:11:35 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 8B2FC180A55 for ; Wed, 14 Oct 2015 17:11:35 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.979 X-Spam-Level: ** X-Spam-Status: No, score=2.979 tagged_above=-999 required=6.31 tests=[HTML_MESSAGE=3, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id lKyX7ObJFGpF for ; Wed, 14 Oct 2015 17:11:34 +0000 (UTC) Received: from mail-lb0-f178.google.com (mail-lb0-f178.google.com [209.85.217.178]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with ESMTPS id B96A521114 for ; Wed, 14 Oct 2015 17:11:33 +0000 (UTC) Received: by lbbck17 with SMTP id ck17so51492212lbb.1 for ; Wed, 14 Oct 2015 10:11:33 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc:content-type; bh=A1UbBg8mDgIjuMsiISiHDunoUGpGrHuviKts5ltFwHU=; b=JsSbkXFJlebAZBc6qjNfrR4eHyFbtG1ZhdcJGRwfEuObP6HbkzT0hAH27oD33oGBXt 6Zi8ue+e3UleOf0KMu9wIU5oNahZ8gqebIZetn7o6wG+WZ4FLxWsej/ccfT5pvy0Wmxz 6obUAteuaCzkXUkNps63e64Gv+nAFA1kCKCCHdL4pV+8pHSWotlxZFy79h1s1zw5nagH AWGmbMc+YEEL5sAniRbhsTlVntWubSonbqQ6llWg9hEnZt8DWFocaTVGoQ5CHbglLD2Q AcafKXtndWbrS5wUxWpbWqDpBAIHlCEZKWyxSHn7KtxHuzZ0x8vc38yojyDAT+rW3dxL YWgg== X-Gm-Message-State: ALoCoQltiDbYBMubvkYUfDlWTiWryc3Q2GxHi428pRckCvzEpdrIvJ1WypvCZqNartOsG+ec5F2/ X-Received: by 10.112.163.131 with SMTP id yi3mr2210961lbb.36.1444842693136; Wed, 14 Oct 2015 10:11:33 -0700 (PDT) MIME-Version: 1.0 Received: by 10.25.81.146 with HTTP; Wed, 14 Oct 2015 10:11:13 -0700 (PDT) In-Reply-To: References: From: Michael Armbrust Date: Wed, 14 Oct 2015 10:11:13 -0700 Message-ID: Subject: Re: Question about data frame partitioning in Spark 1.3.0 To: Cesar Flores Cc: user Content-Type: multipart/alternative; boundary=089e01182d967b2c36052213a783 --089e01182d967b2c36052213a783 Content-Type: text/plain; charset=UTF-8 This won't help as for two reasons: 1) Its all still just creating lineage since you aren't caching the partitioned data. It will still fetch the shuffled blocks for each query. 2) The query optimizer is not aware of RDD level partitioning since its mostly a blackbox. 1) could be fixed by adding caching. 2) is on our roadmap (though you'd have to use logical DataFrame expressions to do the partitioning instead of a class based partitioner). On Wed, Oct 14, 2015 at 8:45 AM, Cesar Flores wrote: > > My current version of spark is 1.3.0 and my question is the next: > > I have large data frames where the main field is an user id. I need to do > many group by's and joins using that field. Do the performance will > increase if before doing any group by or join operation I first convert to > rdd to partition by the user id? In other words trying something like the > next lines in all my user data tables will improve the performance in the > long run?: > > val partitioned_rdd = unpartitioned_df > .map(row=>(row.getLong(0), row)) > .partitionBy(new HashPartitioner(200)) > .map(x => x._2) > > val partitioned_df = hc.createDataFrame(partitioned_rdd, > unpartitioned_df.schema) > > > > > Thanks a lot > -- > Cesar Flores > --089e01182d967b2c36052213a783 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
This won't help as for two reasons:
=C2=A01) Its a= ll still just creating lineage since you aren't caching the partitioned= data.=C2=A0 It will still fetch the shuffled blocks for each query.
<= div>=C2=A02) The query optimizer is not aware of RDD level partitioning sin= ce its mostly a blackbox.

1) could be fixed by add= ing caching. =C2=A02) is on our roadmap (though you'd have to use logic= al DataFrame expressions to do the partitioning instead of a class based pa= rtitioner).

On Wed, Oct 14, 2015 at 8:45 AM, Cesar Flores <<= a href=3D"mailto:cesar7@gmail.com" target=3D"_blank">cesar7@gmail.com&g= t; wrote:
My current version of spark is 1.3.0 and my question is the ne= xt:

I have large data frames where the main field = is an user id. I need to do many group by's and joins using that field.= Do the performance will increase if before doing any group by or join oper= ation I first convert to rdd to partition by the user id? In other words tr= ying something like the next lines in all my user data tables will improve = the performance in the long run?:

val partitioned_= rdd =3D unpartitioned_df
=C2=A0 =C2=A0.map(row=3D>(row.getLong= (0), row))
=C2=A0 =C2=A0.partitionBy(new HashPartitioner(200))
=C2=A0 =C2=A0.map(x =3D> x._2)

val part= itioned_df =3D hc.createDataFrame(partitioned_rdd, unpartitioned_df.schema)=




Than= ks a lot
--=C2=A0<= br>
Cesar Flores

--089e01182d967b2c36052213a783--