pig-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "liyunzhang_intel (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (PIG-4577) Use "cogroup" spark api to implement "groupby+secondarysort" case in GlobalRearrangeConverter.java
Date Mon, 01 Jun 2015 01:22:17 GMT

     [ https://issues.apache.org/jira/browse/PIG-4577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

liyunzhang_intel updated PIG-4577:
----------------------------------
    Attachment: PIG-4577.patch

[~mohitsabharwal]:
 in PIG-4577.patch,changes are:
1. use "cogroup" spark api to implement "groupby+secondarysort" case in GlobalRearrangeConverter.java

> Use "cogroup" spark api to implement "groupby+secondarysort" case in GlobalRearrangeConverter.java
> --------------------------------------------------------------------------------------------------
>
>                 Key: PIG-4577
>                 URL: https://issues.apache.org/jira/browse/PIG-4577
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-4577.patch
>
>
> In PIG-4565(Support custom MR partitioners for Spark engine), we refine the code of GlobalRearrangeConverter(use
"cogroup" spark api to implement "groupby","join" case except the "groupby+secondarysort"
case)
> in PIG-4565_2.patch:
> GlobalRearrangeConverter.java
> {code}
>  @Override
>     public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
>                               POGlobalRearrangeSpark op) throws IOException {
>         SparkUtil.assertPredecessorSizeGreaterThan(predecessors,
>                 op, 0);
>         int parallelism = SparkUtil.getParallelism(predecessors,
>                 op);
> //         TODO: Figure out the tradeoff of using CoGroupRDD (even for 1 input),
> //         vs using groupBy (like we do in this commented code), vs using
> //         reduceByKey(). This is a pending task in Pig on Spark Milestone 1
> //         Once we figure that out, we can allow custom partitioning for
> //         secondary sort case as well.
> //        if (predecessors.size() == 1) {
> //            // GROUP BY
> //            JavaPairRDD<Object, Iterable<Tuple>> prdd;
> //            if (op.isUseSecondaryKey()) {
> //                prdd = handleSecondarySort(predecessors.get(0), op, parallelism);
> //            } else {
> //                JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD();
> //                prdd = jrdd.groupBy(new GetKeyFunction(op), parallelism);
> //                prdd.groupByKey(new CustomPartitioner(op.getCustomPartitioner(),
> //                        parallelism));
> //            }
> //            JavaRDD<Tuple> jrdd2 = prdd.map(new GroupTupleFunction(op));
> //            return jrdd2.rdd();
> //
> //        if (predecessors.size() == 1 && op.isUseSecondaryKey()) {
> //            return handleSecondarySort(predecessors.get(0), op, parallelism);
> //        }
>         if (predecessors.size() == 1 && op.isUseSecondaryKey()) {
>             return handleSecondarySort(predecessors.get(0), op, parallelism);
>         }
>         List<RDD<Tuple2<Object, Tuple>>> rddPairs = new ArrayList<RDD<Tuple2<Object,
Tuple>>>();
>         for (RDD<Tuple> rdd : predecessors) {
>             JavaRDD<Tuple> jrdd = JavaRDD.fromRDD(rdd, SparkUtil.getManifest(Tuple.class));
>             JavaRDD<Tuple2<Object, Tuple>> rddPair = jrdd.map(new ToKeyValueFunction());
>             rddPairs.add(rddPair.rdd());
>         }
>         // Something's wrong with the type parameters of CoGroupedRDD
>         // key and value are the same type ???
>         CoGroupedRDD<Object> coGroupedRDD = new CoGroupedRDD<Object>(
>                 (Seq<RDD<? extends Product2<Object, ?>>>) (Object)
(JavaConversions
>                         .asScalaBuffer(rddPairs).toSeq()),
>                 SparkUtil.getPartitioner(op.getCustomPartitioner(), parallelism)
>         );
>         RDD<Tuple2<Object, Seq<Seq<Tuple>>>> rdd =
>             (RDD<Tuple2<Object, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD;
>         return rdd.toJavaRDD().map(new ToGroupKeyValueFunction()).rdd();
>     }
> {code}
> Actually, we can also use "cogroup" spark api to implement "secondarysort+groupby" case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message