Return-Path: X-Original-To: apmail-pig-dev-archive@www.apache.org Delivered-To: apmail-pig-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E83961075C for ; Wed, 3 Jun 2015 04:23:49 +0000 (UTC) Received: (qmail 1829 invoked by uid 500); 3 Jun 2015 04:23:49 -0000 Delivered-To: apmail-pig-dev-archive@pig.apache.org Received: (qmail 1766 invoked by uid 500); 3 Jun 2015 04:23:49 -0000 Mailing-List: contact dev-help@pig.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pig.apache.org Delivered-To: mailing list dev@pig.apache.org Received: (qmail 1751 invoked by uid 500); 3 Jun 2015 04:23:49 -0000 Delivered-To: apmail-hadoop-pig-dev@hadoop.apache.org Received: (qmail 1748 invoked by uid 99); 3 Jun 2015 04:23:49 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Jun 2015 04:23:49 +0000 Date: Wed, 3 Jun 2015 04:23:49 +0000 (UTC) From: "Xuefu Zhang (JIRA)" To: pig-dev@hadoop.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Updated] (PIG-4577) Use "cogroup" spark api to implement "groupby+secondarysort" case in GlobalRearrangeConverter.java MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/PIG-4577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuefu Zhang updated PIG-4577: ----------------------------- Resolution: Fixed Status: Resolved (was: Patch Available) Committed to Spark branch. Thanks, Liyun! > Use "cogroup" spark api to implement "groupby+secondarysort" case in GlobalRearrangeConverter.java > -------------------------------------------------------------------------------------------------- > > Key: PIG-4577 > URL: https://issues.apache.org/jira/browse/PIG-4577 > Project: Pig > Issue Type: Sub-task > Components: spark > Reporter: liyunzhang_intel > Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-4577.patch > > > In PIG-4565(Support custom MR partitioners for Spark engine), we refine the code of GlobalRearrangeConverter(use "cogroup" spark api to implement "groupby","join" case except the "groupby+secondarysort" case) > in PIG-4565_2.patch: > GlobalRearrangeConverter.java > {code} > @Override > public RDD convert(List> predecessors, > POGlobalRearrangeSpark op) throws IOException { > SparkUtil.assertPredecessorSizeGreaterThan(predecessors, > op, 0); > int parallelism = SparkUtil.getParallelism(predecessors, > op); > // TODO: Figure out the tradeoff of using CoGroupRDD (even for 1 input), > // vs using groupBy (like we do in this commented code), vs using > // reduceByKey(). This is a pending task in Pig on Spark Milestone 1 > // Once we figure that out, we can allow custom partitioning for > // secondary sort case as well. > // if (predecessors.size() == 1) { > // // GROUP BY > // JavaPairRDD> prdd; > // if (op.isUseSecondaryKey()) { > // prdd = handleSecondarySort(predecessors.get(0), op, parallelism); > // } else { > // JavaRDD jrdd = predecessors.get(0).toJavaRDD(); > // prdd = jrdd.groupBy(new GetKeyFunction(op), parallelism); > // prdd.groupByKey(new CustomPartitioner(op.getCustomPartitioner(), > // parallelism)); > // } > // JavaRDD jrdd2 = prdd.map(new GroupTupleFunction(op)); > // return jrdd2.rdd(); > // > // if (predecessors.size() == 1 && op.isUseSecondaryKey()) { > // return handleSecondarySort(predecessors.get(0), op, parallelism); > // } > if (predecessors.size() == 1 && op.isUseSecondaryKey()) { > return handleSecondarySort(predecessors.get(0), op, parallelism); > } > List>> rddPairs = new ArrayList>>(); > for (RDD rdd : predecessors) { > JavaRDD jrdd = JavaRDD.fromRDD(rdd, SparkUtil.getManifest(Tuple.class)); > JavaRDD> rddPair = jrdd.map(new ToKeyValueFunction()); > rddPairs.add(rddPair.rdd()); > } > // Something's wrong with the type parameters of CoGroupedRDD > // key and value are the same type ??? > CoGroupedRDD coGroupedRDD = new CoGroupedRDD( > (Seq>>) (Object) (JavaConversions > .asScalaBuffer(rddPairs).toSeq()), > SparkUtil.getPartitioner(op.getCustomPartitioner(), parallelism) > ); > RDD>>> rdd = > (RDD>>>) (Object) coGroupedRDD; > return rdd.toJavaRDD().map(new ToGroupKeyValueFunction()).rdd(); > } > {code} > Actually, we can also use "cogroup" spark api to implement "secondarysort+groupby" case. -- This message was sent by Atlassian JIRA (v6.3.4#6332)