pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sz...@apache.org
Subject svn commit: r1796639 [4/12] - in /pig/trunk: ./ bin/ ivy/ src/META-INF/services/ src/docs/src/documentation/content/xdocs/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/execution...
Date Mon, 29 May 2017 15:00:41 GMT
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java?rev=1796639&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java Mon May 29 15:00:39 2017
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoinSpark;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.rdd.RDD;
+
+@SuppressWarnings("serial")
+public class FRJoinConverter implements
+        RDDConverter<Tuple, Tuple, POFRJoin> {
+    private static final Log LOG = LogFactory.getLog(FRJoinConverter.class);
+
+    private Set<String> replicatedInputs;
+
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
+                              POFRJoin poFRJoin) throws IOException {
+        SparkUtil.assertPredecessorSizeGreaterThan(predecessors, poFRJoin, 1);
+        RDD<Tuple> rdd = predecessors.get(0);
+
+        attachReplicatedInputs((POFRJoinSpark) poFRJoin);
+
+        FRJoinFunction frJoinFunction = new FRJoinFunction(poFRJoin);
+        return rdd.toJavaRDD().mapPartitions(frJoinFunction, true).rdd();
+    }
+
+    private void attachReplicatedInputs(POFRJoinSpark poFRJoin) {
+        Map<String, List<Tuple>> replicatedInputMap = new HashMap<>();
+
+        for (String replicatedInput : replicatedInputs) {
+            replicatedInputMap.put(replicatedInput, SparkPigContext.get().getBroadcastedVars().get(replicatedInput).value());
+        }
+
+        poFRJoin.attachInputs(replicatedInputMap);
+    }
+
+    private static class FRJoinFunction implements
+            FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
+
+        private POFRJoin poFRJoin;
+        private FRJoinFunction(POFRJoin poFRJoin) {
+            this.poFRJoin = poFRJoin;
+        }
+
+        @Override
+        public Iterable<Tuple> call(final Iterator<Tuple> input) throws Exception {
+
+            return new Iterable<Tuple>() {
+
+                @Override
+                public Iterator<Tuple> iterator() {
+                    return new OutputConsumerIterator(input) {
+
+                        @Override
+                        protected void attach(Tuple tuple) {
+                            poFRJoin.setInputs(null);
+                            poFRJoin.attachInput(tuple);
+                        }
+
+                        @Override
+                        protected Result getNextResult() throws ExecException {
+                            return poFRJoin.getNextTuple();
+                        }
+
+                        @Override
+                        protected void endOfInput() {
+                        }
+                    };
+                }
+            };
+        }
+
+    }
+
+    public void setReplicatedInputs(Set<String> replicatedInputs) {
+        this.replicatedInputs = replicatedInputs;
+    }
+}
\ No newline at end of file

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java?rev=1796639&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java Mon May 29 15:00:39 2017
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.Serializable;
+import java.util.List;
+
+import scala.runtime.AbstractFunction1;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.spark.rdd.RDD;
+
+/**
+ * Converter that converts an RDD to a filtered RRD using POFilter
+ */
+@SuppressWarnings({ "serial" })
+public class FilterConverter implements RDDConverter<Tuple, Tuple, POFilter> {
+
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
+            POFilter physicalOperator) {
+        SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
+        RDD<Tuple> rdd = predecessors.get(0);
+        FilterFunction filterFunction = new FilterFunction(physicalOperator);
+        return rdd.filter(filterFunction);
+    }
+
+    private static class FilterFunction extends
+            AbstractFunction1<Tuple, Object> implements Serializable {
+
+        private POFilter poFilter;
+
+        private FilterFunction(POFilter poFilter) {
+            this.poFilter = poFilter;
+        }
+
+        @Override
+        public Boolean apply(Tuple v1) {
+            Result result;
+            try {
+                poFilter.setInputs(null);
+                poFilter.attachInput(v1);
+                result = poFilter.getNextTuple();
+            } catch (ExecException e) {
+                throw new RuntimeException("Couldn't filter tuple", e);
+            }
+
+            if (result == null) {
+                return false;
+            }
+
+            switch (result.returnStatus) {
+            case POStatus.STATUS_OK:
+                return true;
+            case POStatus.STATUS_EOP: // TODO: probably also ok for EOS,
+                                      // END_OF_BATCH
+                return false;
+            default:
+                throw new RuntimeException(
+                        "Unexpected response code from filter: " + result);
+            }
+        }
+    }
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java?rev=1796639&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java Mon May 29 15:00:39 2017
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.ProgressableReporter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.SchemaTupleBackend;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.rdd.RDD;
+
+/**
+ * Convert that is able to convert an RRD to another RRD using a POForEach
+ */
+@SuppressWarnings({"serial" })
+public class ForEachConverter implements RDDConverter<Tuple, Tuple, POForEach> {
+
+    private JobConf jobConf;
+
+    public ForEachConverter(JobConf jobConf) {
+        this.jobConf = jobConf;
+    }
+
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
+            POForEach physicalOperator) {
+
+        byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
+
+        SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
+        RDD<Tuple> rdd = predecessors.get(0);
+        ForEachFunction forEachFunction = new ForEachFunction(physicalOperator, confBytes);
+
+        return rdd.toJavaRDD().mapPartitions(forEachFunction, true).rdd();
+    }
+
+    private static class ForEachFunction implements
+            FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
+
+        private POForEach poForEach;
+        private byte[] confBytes;
+        private transient JobConf jobConf;
+
+        private ForEachFunction(POForEach poForEach, byte[] confBytes) {
+            this.poForEach = poForEach;
+            this.confBytes = confBytes;
+        }
+
+        public Iterable<Tuple> call(final Iterator<Tuple> input) {
+
+            initialize();
+
+            // Initialize a reporter as the UDF might want to report progress.
+            PhysicalOperator.setReporter(new ProgressableReporter());
+            PhysicalOperator[] planLeafOps= poForEach.getPlanLeafOps();
+            if (planLeafOps != null) {
+                for (PhysicalOperator op : planLeafOps) {
+                    if (op.getClass() == POUserFunc.class) {
+                        POUserFunc udf = (POUserFunc) op;
+                          udf.setFuncInputSchema();
+                    }
+                }
+            }
+
+
+            return new Iterable<Tuple>() {
+
+                @Override
+                public Iterator<Tuple> iterator() {
+                    return new OutputConsumerIterator(input) {
+
+                        @Override
+                        protected void attach(Tuple tuple) {
+                            poForEach.setInputs(null);
+                            poForEach.attachInput(tuple);
+                        }
+
+                        @Override
+                        protected Result getNextResult() throws ExecException {
+                            return poForEach.getNextTuple();
+                        }
+
+                        @Override
+                        protected void endOfInput() {
+                        }
+                    };
+                }
+            };
+        }
+
+        private void initialize() {
+            if (this.jobConf == null) {
+                try {
+                    this.jobConf = KryoSerializer.deserializeJobConf(this.confBytes);
+                    PigContext pc = (PigContext) ObjectSerializer.deserialize(jobConf.get("pig.pigContext"));
+                    SchemaTupleBackend.initialize(jobConf, pc);
+                } catch (IOException e) {
+                    throw new RuntimeException("Couldn't initialize ForEachConverter");
+                }
+            }
+        }
+    }
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java?rev=1796639&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java Mon May 29 15:00:39 2017
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.spark.HashPartitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.rdd.CoGroupedRDD;
+import org.apache.spark.rdd.RDD;
+
+import scala.Product2;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+import scala.runtime.AbstractFunction1;
+
+@SuppressWarnings({ "serial" })
+public class GlobalRearrangeConverter implements
+        RDDConverter<Tuple, Tuple, POGlobalRearrangeSpark> {
+    private static final Log LOG = LogFactory
+            .getLog(GlobalRearrangeConverter.class);
+
+    private static final TupleFactory tf = TupleFactory.getInstance();
+ 
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
+                              POGlobalRearrangeSpark op) throws IOException {
+        SparkUtil.assertPredecessorSizeGreaterThan(predecessors,
+                op, 0);
+        int parallelism = SparkPigContext.get().getParallelism(predecessors,
+                op);
+
+//         TODO: Figure out the tradeoff of using CoGroupRDD (even for 1 input),
+//         vs using groupBy (like we do in this commented code), vs using
+//         reduceByKey(). This is a pending task in Pig on Spark Milestone 1
+//         Once we figure that out, we can allow custom partitioning for
+//         secondary sort case as well.
+//        if (predecessors.size() == 1) {
+//            // GROUP BY
+//            JavaPairRDD<Object, Iterable<Tuple>> prdd;
+//            if (op.isUseSecondaryKey()) {
+//                prdd = handleSecondarySort(predecessors.get(0), op, parallelism);
+//            } else {
+//                JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD();
+//                prdd = jrdd.groupBy(new GetKeyFunction(op), parallelism);
+//                prdd.groupByKey(new CustomPartitioner(op.getCustomPartitioner(),
+//                        parallelism));
+//            }
+//            JavaRDD<Tuple> jrdd2 = prdd.map(new GroupTupleFunction(op));
+//            return jrdd2.rdd();
+//
+//        if (predecessors.size() == 1 && op.isUseSecondaryKey()) {
+//            return handleSecondarySort(predecessors.get(0), op, parallelism);
+//        }
+        List<RDD<Tuple2<IndexedKey, Tuple>>> rddPairs = new ArrayList<RDD<Tuple2<IndexedKey, Tuple>>>();
+
+        if (predecessors.size() == 1 && op.isUseSecondaryKey()) {
+            rddPairs.add(handleSecondarySort(predecessors.get(0), op, parallelism).rdd());
+        } else {
+            for (RDD<Tuple> rdd : predecessors) {
+                JavaRDD<Tuple> jrdd = JavaRDD.fromRDD(rdd, SparkUtil.getManifest(Tuple.class));
+                JavaRDD<Tuple2<IndexedKey, Tuple>> rddPair = jrdd.map(new ToKeyValueFunction());
+                rddPairs.add(rddPair.rdd());
+            }
+        }
+
+        // Something's wrong with the type parameters of CoGroupedRDD
+        // key and value are the same type ???
+        CoGroupedRDD<Object> coGroupedRDD = new CoGroupedRDD<Object>(
+                (Seq<RDD<? extends Product2<Object, ?>>>) (Object) (JavaConversions
+                        .asScalaBuffer(rddPairs).toSeq()),
+                SparkUtil.getPartitioner(op.getCustomPartitioner(), parallelism), SparkUtil.getManifest(Object.class));
+
+        RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>> rdd =
+            (RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD;
+        return rdd.toJavaRDD().map(new ToGroupKeyValueFunction()).rdd();
+    }
+
+    private JavaRDD<Tuple2<IndexedKey,Tuple>> handleSecondarySort(
+            RDD<Tuple> rdd, POGlobalRearrangeSpark op, int parallelism) {
+
+        RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new ToKeyNullValueFunction(),
+                SparkUtil.<Tuple, Object>getTuple2Manifest());
+
+        JavaPairRDD<Tuple, Object> pairRDD = new JavaPairRDD<Tuple, Object>(rddPair,
+                SparkUtil.getManifest(Tuple.class),
+                SparkUtil.getManifest(Object.class));
+
+        //first sort the tuple by secondary key if enable useSecondaryKey sort
+        JavaPairRDD<Tuple, Object> sorted = pairRDD.repartitionAndSortWithinPartitions(
+                new HashPartitioner(parallelism),
+                new PigSecondaryKeyComparatorSpark(op.getSecondarySortOrder()));
+        JavaRDD<Tuple> jrdd = sorted.keys();
+        JavaRDD<Tuple2<IndexedKey,Tuple>> jrddPair = jrdd.map(new ToKeyValueFunction(op));
+        return jrddPair;
+    }
+
+    private static class RemoveValueFunction implements
+            FlatMapFunction<Iterator<Tuple2<Tuple, Object>>, Tuple>, Serializable {
+
+        private class Tuple2TransformIterable implements Iterable<Tuple> {
+
+            Iterator<Tuple2<Tuple, Object>> in;
+
+            Tuple2TransformIterable(Iterator<Tuple2<Tuple, Object>> input) {
+                in = input;
+            }
+
+            public Iterator<Tuple> iterator() {
+                return new IteratorTransform<Tuple2<Tuple, Object>, Tuple>(in) {
+                    @Override
+                    protected Tuple transform(Tuple2<Tuple, Object> next) {
+                        return next._1();
+                    }
+                };
+            }
+        }
+
+        @Override
+        public Iterable<Tuple> call(Iterator<Tuple2<Tuple, Object>> input) {
+            return new Tuple2TransformIterable(input);
+        }
+    }
+
+    private static class ToKeyNullValueFunction extends
+            AbstractFunction1<Tuple, Tuple2<Tuple, Object>> implements
+            Serializable {
+
+        @Override
+        public Tuple2<Tuple, Object> apply(Tuple t) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("ToKeyNullValueFunction in " + t);
+            }
+
+            Tuple2<Tuple, Object> out = new Tuple2<Tuple, Object>(t, null);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("ToKeyNullValueFunction out " + out);
+            }
+
+            return out;
+        }
+    }
+
+    /**
+     * Function that extract keys from locally rearranged tuples.
+     */
+    private static class GetKeyFunction implements Function<Tuple, Object>, Serializable {
+        public final POGlobalRearrangeSpark glrSpark;
+
+        public GetKeyFunction(POGlobalRearrangeSpark globalRearrangeSpark) {
+            this.glrSpark = globalRearrangeSpark;
+        }
+
+        public Object call(Tuple t) {
+            try {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("GetKeyFunction in " + t);
+                }
+
+                Object key;
+                if ((glrSpark != null) && (glrSpark.isUseSecondaryKey())) {
+                    key = ((Tuple) t.get(1)).get(0);
+                } else {
+                    key = t.get(1);
+                }
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("GetKeyFunction out " + key);
+                }
+
+                return key;
+            } catch (ExecException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    /**
+     * Function that converts elements of PairRDD to regular RDD.
+     * - Input (PairRDD) contains elements of the form
+     * Tuple2<key, Iterable<(index, key, value)>>.
+     * - Output (regular RDD) contains elements of the form
+     * Tuple<(key, iterator to (index, key, value))>
+     */
+    private static class GroupTupleFunction
+            implements Function<Tuple2<Object, Iterable<Tuple>>, Tuple>,
+            Serializable {
+        public final POGlobalRearrangeSpark glrSpark;
+
+        public GroupTupleFunction(POGlobalRearrangeSpark globalRearrangeSpark) {
+            this.glrSpark = globalRearrangeSpark;
+        }
+
+        public Tuple call(Tuple2<Object, Iterable<Tuple>> v1) {
+            try {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("GroupTupleFunction in " + v1);
+                }
+
+                Tuple tuple = tf.newTuple(2);
+                tuple.set(0, v1._1()); // key
+                // Note that v1._2() is (index, key, value) tuple, and
+                // v1._2().iterator() is iterator to Seq<Tuple> (aka bag of values)
+                tuple.set(1, v1._2().iterator());
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("GroupTupleFunction out " + tuple);
+                }
+
+                return tuple;
+            } catch (ExecException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+
+    /**
+     * Converts incoming locally rearranged tuple, which is of the form
+     * (index, key, value) into Tuple2<key, value>
+     */
+    private static class ToKeyValueFunction implements
+            Function<Tuple, Tuple2<IndexedKey, Tuple>>, Serializable {
+
+        private POGlobalRearrangeSpark glrSpark = null;
+
+        public ToKeyValueFunction(POGlobalRearrangeSpark glrSpark) {
+            this.glrSpark = glrSpark;
+        }
+
+        public ToKeyValueFunction() {
+
+        }
+
+        @Override
+        public Tuple2<IndexedKey, Tuple> call(Tuple t) {
+            try {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("ToKeyValueFunction in " + t);
+                }
+
+                Object key = null;
+                if ((glrSpark != null) && (glrSpark.isUseSecondaryKey())) {
+                    key = ((Tuple) t.get(1)).get(0);
+                } else {
+                    key = t.get(1);
+                }
+
+                Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>(
+                        new IndexedKey((Byte) t.get(0), key),
+                        (Tuple) t.get(2));
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("ToKeyValueFunction out " + out);
+                }
+
+                return out;
+            } catch (ExecException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    /**
+     * Converts cogroup output where each element is {key, bag[]} represented
+     * as Tuple2<Object, Seq<Seq<Tuple>>> into tuple of form
+     * (key, Iterator<(index, key, value)>)
+     */
+    private static class ToGroupKeyValueFunction implements
+            Function<Tuple2<IndexedKey, Seq<Seq<Tuple>>>, Tuple>, Serializable {
+
+        @Override
+        public Tuple call(Tuple2<IndexedKey, Seq<Seq<Tuple>>> input) {
+            try {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("ToGroupKeyValueFunction in " + input);
+                }
+
+                final Object key = input._1().getKey();
+                Object obj = input._2();
+                // XXX this is a hack for Spark 1.1.0: the type is an Array, not Seq
+                Seq<Tuple>[] bags = (Seq<Tuple>[])obj;
+                int i = 0;
+                List<Iterator<Tuple>> tupleIterators = new ArrayList<Iterator<Tuple>>();
+                for (int j = 0; j < bags.length; j ++) {
+                    Seq<Tuple> bag = bags[j];
+                    Iterator<Tuple> iterator = JavaConversions
+                            .asJavaCollection(bag).iterator();
+                    final int index = i;
+                    tupleIterators.add(new IteratorTransform<Tuple, Tuple>(
+                            iterator) {
+                        @Override
+                        protected Tuple transform(Tuple next) {
+                            try {
+                                Tuple tuple = tf.newTuple(3);
+                                tuple.set(0, index);
+                                tuple.set(1, next);
+                                return tuple;
+                            } catch (ExecException e) {
+                                throw new RuntimeException(e);
+                            }
+                        }
+                    });
+                    ++ i;
+                }
+
+                Tuple out = tf.newTuple(2);
+                out.set(0, key);
+                out.set(1, new IteratorUnion<Tuple>(tupleIterators.iterator()));
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("ToGroupKeyValueFunction out " + out);
+                }
+
+                return out;
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private static class IteratorUnion<T> implements Iterator<T> {
+
+        private final Iterator<Iterator<T>> iterators;
+
+        private Iterator<T> current;
+
+        public IteratorUnion(Iterator<Iterator<T>> iterators) {
+            super();
+            this.iterators = iterators;
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (current != null && current.hasNext()) {
+                return true;
+            } else if (iterators.hasNext()) {
+                current = iterators.next();
+                return hasNext();
+            } else {
+                return false;
+            }
+        }
+
+        @Override
+        public T next() {
+            return current.next();
+        }
+
+        @Override
+        public void remove() {
+            current.remove();
+        }
+
+    }
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java?rev=1796639&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java Mon May 29 15:00:39 2017
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.Serializable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+import org.joda.time.DateTime;
+
+/**
+ * IndexedKey records the index and key info.
+ * This is used as key for JOINs. It addresses the case where key is
+ * either empty (or is a tuple with one or more empty fields). In this case,
+ * we must respect the SQL standard as documented in the equals() method.
+ */
+public class IndexedKey implements Serializable, Comparable {
+    private static final Log LOG = LogFactory.getLog(IndexedKey.class);
+    private byte index;
+    private Object key;
+    private boolean useSecondaryKey;
+    private boolean[] secondarySortOrder;
+
+    public IndexedKey(byte index, Object key) {
+        this.index = index;
+        this.key = key;
+    }
+
+    public byte getIndex() {
+        return index;
+    }
+
+    public Object getKey() {
+        return key;
+    }
+
+    @Override
+    public String toString() {
+        return "IndexedKey{" +
+                "index=" + index +
+                ", key=" + key +
+                '}';
+    }
+
+    /**
+     * If key is empty, we'd like compute equality based on key and index.
+     * If key is not empty, we'd like to compute equality based on just the key (like we normally do).
+     * There are two possible cases when two tuples are compared:
+     * 1) Compare tuples of same table (same index)
+     * 2) Compare tuples of different tables (different index values)
+     * In 1)
+     * key1    key2    equal?
+     * null    null      Y
+     * foo     null      N
+     * null    foo       N
+     * foo     foo       Y
+     * (1,1)   (1,1)     Y
+     * (1,)    (1,)      Y
+     * (1,2)   (1,2)     Y
+     * <p/>
+     * <p/>
+     * In 2)
+     * key1    key2    equal?
+     * null    null     N
+     * foo     null     N
+     * null    foo      N
+     * foo     foo      Y
+     * (1,1)   (1,1)    Y
+     * (1,)    (1,)     N
+     * (1,2)   (1,2)    Y
+     *
+     * @param o
+     * @return
+     */
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        IndexedKey that = (IndexedKey) o;
+        if (index == that.index) {
+            if (key == null && that.key == null) {
+                return true;
+            } else if (key == null || that.key == null) {
+                return false;
+            } else {
+                if (key instanceof DateTime) {
+                    //In case of DateTime we use the less strict isEqual method so that
+                    //e.g. 2017.01.01T10:00:00.000Z and 2017.01.01T11:00:00.000+01:00 are equal
+                    return ((DateTime) key).isEqual((DateTime)that.key);
+                } else {
+                    return key.equals(that.key);
+                }
+            }
+        } else {
+            if (key == null || that.key == null) {
+                return false;
+            } else if (key.equals(that.key) && !containNullfields(key)) {
+                return true;
+            } else {
+                return false;
+            }
+        }
+    }
+
+    private boolean containNullfields(Object key) {
+        if (key instanceof Tuple) {
+            for (int i = 0; i < ((Tuple) key).size(); i++) {
+                try {
+                    if (((Tuple) key).get(i) == null) {
+                        return true;
+                    }
+                } catch (ExecException e) {
+                    throw new RuntimeException("exception found in " +
+                            "containNullfields", e);
+
+                }
+            }
+        }
+        return false;
+
+    }
+
+    /**
+     * Calculate hashCode by index and key
+     * if key is empty, return index value
+     * if key is not empty, return the key.hashCode()
+     */
+    @Override
+    public int hashCode() {
+        int result = 0;
+        if (key == null) {
+            result = (int) index;
+        } else {
+            if (key instanceof DateTime) {
+                //In case of DateTime we use a custom hashCode() to avoid chronology taking part in the hash value
+                DateTime dt = (DateTime)key;
+                result = (int) (dt.getMillis() ^ dt.getMillis() >>> 32);
+            } else {
+                result = key.hashCode();
+            }
+        }
+        return result;
+    }
+
+    //firstly compare the index
+    //secondly compare the key (both first and secondary key)
+    @Override
+    public int compareTo(Object o) {
+        IndexedKey that = (IndexedKey) o;
+        int res = index - that.getIndex();
+        if (res > 0) {
+            return 1;
+        } else if (res < 0) {
+            return -1;
+        } else {
+            if (useSecondaryKey) {
+                Tuple thisCompoundKey = (Tuple) key;
+                Tuple thatCompoundKey = (Tuple)that.getKey();
+                PigSecondaryKeyComparatorSpark comparator = new PigSecondaryKeyComparatorSpark(secondarySortOrder);
+                return comparator.compareCompoundKey(thisCompoundKey, thatCompoundKey);
+            } else {
+                return DataType.compare(key, that.getKey());
+            }
+        }
+    }
+
+    public void setUseSecondaryKey(boolean useSecondaryKey) {
+        this.useSecondaryKey = useSecondaryKey;
+    }
+
+    public void setSecondarySortOrder(boolean[] secondarySortOrder) {
+        this.secondarySortOrder = secondarySortOrder;
+    }
+}
\ No newline at end of file

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IteratorTransform.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IteratorTransform.java?rev=1796639&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IteratorTransform.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IteratorTransform.java Mon May 29 15:00:39 2017
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.util.Iterator;
+
+abstract class IteratorTransform<IN, OUT> implements Iterator<OUT> {
+    private Iterator<IN> delegate;
+
+    public IteratorTransform(Iterator<IN> delegate) {
+        super();
+        this.delegate = delegate;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return delegate.hasNext();
+    }
+
+    @Override
+    public OUT next() {
+        return transform(delegate.next());
+    }
+
+    abstract protected OUT transform(IN next);
+
+    @Override
+    public void remove() {
+        delegate.remove();
+    }
+
+}
\ No newline at end of file

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java?rev=1796639&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java Mon May 29 15:00:39 2017
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.rdd.CoGroupedRDD;
+import org.apache.spark.rdd.RDD;
+
+import scala.Product2;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+import scala.runtime.AbstractFunction1;
+
+public class JoinGroupSparkConverter implements RDDConverter<Tuple, Tuple, POJoinGroupSpark> {
+    private static final Log LOG = LogFactory
+            .getLog(JoinGroupSparkConverter.class);
+
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POJoinGroupSpark op) throws IOException {
+        SparkUtil.assertPredecessorSizeGreaterThan(predecessors,
+                op, 0);
+        List<POLocalRearrange> lraOps = op.getLROps();
+        POGlobalRearrangeSpark glaOp = op.getGROp();
+        POPackage pkgOp = op.getPkgOp();
+        int parallelism = SparkPigContext.get().getParallelism(predecessors, glaOp);
+        List<RDD<Tuple2<IndexedKey, Tuple>>> rddAfterLRA = new ArrayList<RDD<Tuple2<IndexedKey, Tuple>>>();
+        boolean useSecondaryKey = glaOp.isUseSecondaryKey();
+
+        for (int i = 0; i < predecessors.size(); i++) {
+            RDD<Tuple> rdd = predecessors.get(i);
+            rddAfterLRA.add(rdd.map(new LocalRearrangeFunction(lraOps.get(i), glaOp),
+                    SparkUtil.<IndexedKey, Tuple>getTuple2Manifest()));
+        }
+        if (rddAfterLRA.size() == 1 && useSecondaryKey) {
+            return SecondaryKeySortUtil.handleSecondarySort(rddAfterLRA.get(0), pkgOp);
+        } else {
+
+            CoGroupedRDD<Object> coGroupedRDD = new CoGroupedRDD<Object>(
+                    (Seq<RDD<? extends Product2<Object, ?>>>) (Object) (JavaConversions
+                            .asScalaBuffer(rddAfterLRA).toSeq()),
+                    SparkUtil.getPartitioner(glaOp.getCustomPartitioner(), parallelism),
+                    SparkUtil.getManifest(Object.class));
+
+            RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>> rdd =
+                    (RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD;
+            return rdd.toJavaRDD().map(new GroupPkgFunction(pkgOp)).rdd();
+        }
+    }
+
+    private static class LocalRearrangeFunction extends
+            AbstractFunction1<Tuple, Tuple2<IndexedKey, Tuple>> implements Serializable {
+
+        private final POLocalRearrange lra;
+
+        private boolean useSecondaryKey;
+        private boolean[] secondarySortOrder;
+
+        public LocalRearrangeFunction(POLocalRearrange lra, POGlobalRearrangeSpark glaOp) {
+            if( glaOp.isUseSecondaryKey()) {
+                this.useSecondaryKey = glaOp.isUseSecondaryKey();
+                this.secondarySortOrder = glaOp.getSecondarySortOrder();
+            }
+            this.lra = lra;
+        }
+
+        @Override
+        public Tuple2<IndexedKey, Tuple> apply(Tuple t) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("LocalRearrangeFunction in " + t);
+            }
+            Result result;
+            try {
+                lra.setInputs(null);
+                lra.attachInput(t);
+                result = lra.getNextTuple();
+
+                if (result == null) {
+                    throw new RuntimeException(
+                            "Null response found for LocalRearange on tuple: "
+                                    + t);
+                }
+
+                switch (result.returnStatus) {
+                    case POStatus.STATUS_OK:
+                        // (index, key, value without keys)
+                        Tuple resultTuple = (Tuple) result.result;
+                        Object key = resultTuple.get(1);
+                        IndexedKey indexedKey = new IndexedKey((Byte) resultTuple.get(0), key);
+                        if( useSecondaryKey) {
+                            indexedKey.setUseSecondaryKey(useSecondaryKey);
+                            indexedKey.setSecondarySortOrder(secondarySortOrder);
+                        }
+                        Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>(indexedKey,
+                                (Tuple) resultTuple.get(2));
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("LocalRearrangeFunction out " + out);
+                        }
+                        return out;
+                    default:
+                        throw new RuntimeException(
+                                "Unexpected response code from operator "
+                                        + lra + " : " + result);
+                }
+            } catch (ExecException e) {
+                throw new RuntimeException(
+                        "Couldn't do LocalRearange on tuple: " + t, e);
+            }
+        }
+
+    }
+
+    /**
+     * Send cogroup output where each element is {key, bag[]} to PoPackage
+     * then call PoPackage#getNextTuple to get the result
+     */
+    private static class GroupPkgFunction implements
+            Function<Tuple2<IndexedKey, Seq<Seq<Tuple>>>, Tuple>, Serializable {
+
+        private final POPackage pkgOp;
+
+        public GroupPkgFunction(POPackage pkgOp) {
+            this.pkgOp = pkgOp;
+        }
+
+        @Override
+        public Tuple call(final Tuple2<IndexedKey, Seq<Seq<Tuple>>> input) {
+            try {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("GroupPkgFunction in " + input);
+                }
+
+                final PigNullableWritable key = new PigNullableWritable() {
+
+                    public Object getValueAsPigType() {
+                        IndexedKey keyTuple = input._1();
+                        return keyTuple.getKey();
+                    }
+                };
+                Object obj = input._2();
+                // XXX this is a hack for Spark 1.1.0: the type is an Array, not Seq
+                Seq<Tuple>[] bags = (Seq<Tuple>[]) obj;
+                int i = 0;
+                List<Iterator<NullableTuple>> tupleIterators = new ArrayList<Iterator<NullableTuple>>();
+                for (int j = 0; j < bags.length; j++) {
+                    Seq<Tuple> bag = bags[j];
+                    Iterator<Tuple> iterator = JavaConversions
+                            .asJavaCollection(bag).iterator();
+                    final int index = i;
+                    tupleIterators.add(new IteratorTransform<Tuple, NullableTuple>(
+                            iterator) {
+                        @Override
+                        protected NullableTuple transform(Tuple next) {
+                            NullableTuple nullableTuple = new NullableTuple(next);
+                            nullableTuple.setIndex((byte) index);
+                            return nullableTuple;
+                        }
+                    });
+                    ++i;
+                }
+
+
+                pkgOp.setInputs(null);
+                pkgOp.attachInput(key, new IteratorUnion<NullableTuple>(tupleIterators.iterator()));
+                Result result = pkgOp.getNextTuple();
+                if (result == null) {
+                    throw new RuntimeException(
+                            "Null response found for Package on key: " + key);
+                }
+                Tuple out;
+                switch (result.returnStatus) {
+                    case POStatus.STATUS_OK:
+                        // (key, {(value)...})
+                        out = (Tuple) result.result;
+                        break;
+                    case POStatus.STATUS_NULL:
+                        out = null;
+                        break;
+                    default:
+                        throw new RuntimeException(
+                                "Unexpected response code from operator "
+                                        + pkgOp + " : " + result + " "
+                                        + result.returnStatus);
+                }
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("GroupPkgFunction out " + out);
+                }
+                return out;
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+
+    private static class IteratorUnion<T> implements Iterator<T> {
+
+        private final Iterator<Iterator<T>> iterators;
+
+        private Iterator<T> current;
+
+        public IteratorUnion(Iterator<Iterator<T>> iterators) {
+            super();
+            this.iterators = iterators;
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (current != null && current.hasNext()) {
+                return true;
+            } else if (iterators.hasNext()) {
+                current = iterators.next();
+                return hasNext();
+            } else {
+                return false;
+            }
+        }
+
+        @Override
+        public T next() {
+            return current.next();
+        }
+
+        @Override
+        public void remove() {
+            current.remove();
+        }
+
+    }
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java?rev=1796639&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java Mon May 29 15:00:39 2017
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.rdd.RDD;
+
+@SuppressWarnings({ "serial" })
+public class LimitConverter implements RDDConverter<Tuple, Tuple, POLimit> {
+
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POLimit poLimit)
+            throws IOException {
+        SparkUtil.assertPredecessorSize(predecessors, poLimit, 1);
+        RDD<Tuple> rdd = predecessors.get(0);
+        LimitFunction limitFunction = new LimitFunction(poLimit);
+        RDD<Tuple> rdd2 = rdd.coalesce(1, false, null);
+        return rdd2.toJavaRDD().mapPartitions(limitFunction, false).rdd();
+    }
+
+    private static class LimitFunction implements FlatMapFunction<Iterator<Tuple>, Tuple> {
+
+        private final POLimit poLimit;
+
+        public LimitFunction(POLimit poLimit) {
+            this.poLimit = poLimit;
+        }
+
+        @Override
+        public Iterable<Tuple> call(final Iterator<Tuple> tuples) {
+
+            return new Iterable<Tuple>() {
+
+                public Iterator<Tuple> iterator() {
+                    return new OutputConsumerIterator(tuples) {
+
+                        @Override
+                        protected void attach(Tuple tuple) {
+                            poLimit.setInputs(null);
+                            poLimit.attachInput(tuple);
+                        }
+
+                        @Override
+                        protected Result getNextResult() throws ExecException {
+                            return poLimit.getNextTuple();
+                        }
+
+                        @Override
+                        protected void endOfInput() {
+                        }
+                    };
+                }
+            };
+        }
+    }
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1796639&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java Mon May 29 15:00:39 2017
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import scala.Function1;
+import scala.Tuple2;
+import scala.runtime.AbstractFunction1;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.PigConstants;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkEngineConf;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.backend.hadoop.executionengine.spark.running.PigInputFormatSpark;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.spark.SparkCounters;
+import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter;
+import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
+import org.apache.spark.SparkContext;
+import org.apache.spark.TaskContext;
+import org.apache.spark.rdd.RDD;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Converter that loads data via POLoad and converts it to RRD&lt;Tuple>. Abuses
+ * the interface a bit in that there is no input RRD to convert in this case.
+ * Instead input is the source path of the POLoad.
+ */
+@SuppressWarnings({ "serial" })
+public class LoadConverter implements RDDConverter<Tuple, Tuple, POLoad> {
+    private static Log LOG = LogFactory.getLog(LoadConverter.class);
+
+    private PigContext pigContext;
+    private PhysicalPlan physicalPlan;
+    private SparkContext sparkContext;
+    private JobConf jobConf;
+    private SparkEngineConf sparkEngineConf;
+
+    public LoadConverter(PigContext pigContext, PhysicalPlan physicalPlan,
+            SparkContext sparkContext, JobConf jobConf, SparkEngineConf sparkEngineConf) {
+        this.pigContext = pigContext;
+        this.physicalPlan = physicalPlan;
+        this.sparkContext = sparkContext;
+        this.jobConf = jobConf;
+        this.sparkEngineConf = sparkEngineConf;
+    }
+
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessorRdds, POLoad op)
+            throws IOException {
+        configureLoader(physicalPlan, op, jobConf);
+
+        // Set the input directory for input formats that are backed by a
+        // filesystem. (Does not apply to HBase, for example).
+        jobConf.set("mapreduce.input.fileinputformat.inputdir",
+                op.getLFile().getFileName());
+
+        // internally set pig.noSplitCombination as true ONLY for
+        // the POLoad operator which has POMergeJoin successor.
+        if (hasMergeJoinSuccessor(op)) {
+            jobConf.set("pig.noSplitCombination", "true");
+        }
+
+
+        //serialize the UDFContext#udfConfs in jobConf
+        UDFContext.getUDFContext().serialize(jobConf);
+
+        //SparkContext.newAPIHadoop will broadcast the jobConf to other worker nodes.
+        //Later in PigInputFormatSpark#createRecordReader, jobConf will be used to
+        //initialize PigContext,UDFContext and SchemaTupleBackend.
+        RDD<Tuple2<Text, Tuple>> hadoopRDD = sparkContext.newAPIHadoopRDD(
+                jobConf, PigInputFormatSpark.class, Text.class, Tuple.class);
+
+        registerUdfFiles();
+
+        ToTupleFunction ttf = new ToTupleFunction(sparkEngineConf);
+
+        //create SparkCounter and set it for ToTupleFunction
+        boolean disableCounter = jobConf.getBoolean("pig.disable.counter", false);
+        if (!op.isTmpLoad() && !disableCounter) {
+            String counterName = SparkStatsUtil.getCounterName(op);
+            SparkPigStatusReporter counterReporter = SparkPigStatusReporter.getInstance();
+            if (counterReporter.getCounters() != null) {
+                counterReporter.getCounters().createCounter(
+                        SparkStatsUtil.SPARK_INPUT_COUNTER_GROUP,
+                        counterName);
+            }
+
+            ttf.setDisableCounter(disableCounter);
+            ttf.setCounterGroupName(SparkStatsUtil.SPARK_INPUT_COUNTER_GROUP);
+            ttf.setCounterName(counterName);
+            ttf.setSparkCounters(SparkPigStatusReporter.getInstance().getCounters());
+        }
+
+        // map to get just RDD<Tuple>
+        return hadoopRDD.map(ttf,
+                SparkUtil.getManifest(Tuple.class));
+    }
+
+    private void registerUdfFiles() throws MalformedURLException{
+        Map<String, File> scriptFiles = pigContext.getScriptFiles();
+        for (Map.Entry<String, File> scriptFile : scriptFiles.entrySet()) {
+            File script = scriptFile.getValue();
+            if (script.exists()) {
+                sparkContext.addFile(script.toURI().toURL().toExternalForm());
+            }
+        }
+    }
+
+    private static class ToTupleFunction extends
+            AbstractFunction1<Tuple2<Text, Tuple>, Tuple> implements
+            Function1<Tuple2<Text, Tuple>, Tuple>, Serializable {
+
+        private String counterGroupName;
+        private String counterName;
+        private SparkCounters sparkCounters;
+        private boolean disableCounter;
+        private SparkEngineConf sparkEngineConf;
+        private boolean initialized;
+
+        public ToTupleFunction(SparkEngineConf sparkEngineConf){
+               this.sparkEngineConf = sparkEngineConf;
+        }
+
+        @Override
+        public Tuple apply(Tuple2<Text, Tuple> v1) {
+            if (!initialized) {
+                long partitionId = TaskContext.get().partitionId();
+                Configuration jobConf = PigMapReduce.sJobConfInternal.get();
+                jobConf.set(PigConstants.TASK_INDEX, Long.toString(partitionId));
+                jobConf.set(MRConfiguration.TASK_ID, Long.toString(partitionId));
+                initialized = true;
+            }
+            if (sparkCounters != null && disableCounter == false) {
+                sparkCounters.increment(counterGroupName, counterName, 1L);
+            }
+            return v1._2();
+        }
+
+        public void setCounterGroupName(String counterGroupName) {
+            this.counterGroupName = counterGroupName;
+        }
+
+        public void setCounterName(String counterName) {
+            this.counterName = counterName;
+        }
+
+        public void setSparkCounters(SparkCounters sparkCounters) {
+            this.sparkCounters = sparkCounters;
+        }
+
+        public void setDisableCounter(boolean disableCounter) {
+            this.disableCounter = disableCounter;
+        }
+    }
+
+    /**
+     * stolen from JobControlCompiler TODO: refactor it to share this
+     *
+     * @param physicalPlan
+     * @param poLoad
+     * @param jobConf
+     * @return
+     * @throws java.io.IOException
+     */
+    private static JobConf configureLoader(PhysicalPlan physicalPlan,
+            POLoad poLoad, JobConf jobConf) throws IOException {
+
+        Job job = new Job(jobConf);
+        LoadFunc loadFunc = poLoad.getLoadFunc();
+
+        loadFunc.setLocation(poLoad.getLFile().getFileName(), job);
+
+        // stolen from JobControlCompiler
+        ArrayList<FileSpec> pigInputs = new ArrayList<FileSpec>();
+        // Store the inp filespecs
+        pigInputs.add(poLoad.getLFile());
+
+        ArrayList<List<OperatorKey>> inpTargets = Lists.newArrayList();
+        ArrayList<String> inpSignatures = Lists.newArrayList();
+        ArrayList<Long> inpLimits = Lists.newArrayList();
+        // Store the target operators for tuples read
+        // from this input
+        List<PhysicalOperator> loadSuccessors = physicalPlan
+                .getSuccessors(poLoad);
+        List<OperatorKey> loadSuccessorsKeys = Lists.newArrayList();
+        if (loadSuccessors != null) {
+            for (PhysicalOperator loadSuccessor : loadSuccessors) {
+                loadSuccessorsKeys.add(loadSuccessor.getOperatorKey());
+            }
+        }
+        inpTargets.add(loadSuccessorsKeys);
+        inpSignatures.add(poLoad.getSignature());
+        inpLimits.add(poLoad.getLimit());
+
+        jobConf.set(PigInputFormat.PIG_INPUTS, ObjectSerializer.serialize(pigInputs));
+        jobConf.set(PigInputFormat.PIG_INPUT_TARGETS, ObjectSerializer.serialize(inpTargets));
+        jobConf.set(PigInputFormat.PIG_INPUT_SIGNATURES,
+                ObjectSerializer.serialize(inpSignatures));
+        jobConf.set(PigInputFormat.PIG_INPUT_LIMITS, ObjectSerializer.serialize(inpLimits));
+        return jobConf;
+    }
+
+    private static boolean hasMergeJoinSuccessor(PhysicalOperator op) {
+        if (op == null || op.getParentPlan() == null) {
+            return false;
+        }
+        List<PhysicalOperator> successors = op.getParentPlan().getSuccessors(op);
+        if (successors == null ) {
+            return false;
+        }
+        for (PhysicalOperator successor : successors){
+            if (successor instanceof POMergeJoin){
+                return true;
+            }
+            if (hasMergeJoinSuccessor(successor)){
+                return true;
+            }
+        }
+        return false;
+    }
+
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeConverter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeConverter.java?rev=1796639&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeConverter.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeConverter.java Mon May 29 15:00:39 2017
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import scala.runtime.AbstractFunction1;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.spark.rdd.RDD;
+
+@SuppressWarnings({ "serial" })
+public class LocalRearrangeConverter implements
+        RDDConverter<Tuple, Tuple, PhysicalOperator> {
+    private static final Log LOG = LogFactory
+            .getLog(LocalRearrangeConverter.class);
+
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
+            PhysicalOperator physicalOperator) throws IOException {
+        SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
+        RDD<Tuple> rdd = predecessors.get(0);
+        // call local rearrange to get key and value
+        return rdd.map(new LocalRearrangeFunction(physicalOperator),
+                SparkUtil.getManifest(Tuple.class));
+
+    }
+
+    private static class LocalRearrangeFunction extends
+            AbstractFunction1<Tuple, Tuple> implements Serializable {
+
+        private final PhysicalOperator physicalOperator;
+
+        public LocalRearrangeFunction(PhysicalOperator physicalOperator) {
+            this.physicalOperator = physicalOperator;
+        }
+
+        @Override
+        public Tuple apply(Tuple t) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("LocalRearrangeFunction in " + t);
+            }
+            Result result;
+            try {
+                physicalOperator.setInputs(null);
+                physicalOperator.attachInput(t);
+                result = physicalOperator.getNextTuple();
+
+                if (result == null) {
+                    throw new RuntimeException(
+                            "Null response found for LocalRearange on tuple: "
+                                    + t);
+                }
+
+                switch (result.returnStatus) {
+                case POStatus.STATUS_OK:
+                    // (index, key, value without keys)
+                    Tuple resultTuple = (Tuple) result.result;
+                    if (LOG.isDebugEnabled())
+                        LOG.debug("LocalRearrangeFunction out " + resultTuple);
+                    return resultTuple;
+                default:
+                    throw new RuntimeException(
+                            "Unexpected response code from operator "
+                                    + physicalOperator + " : " + result);
+                }
+            } catch (ExecException e) {
+                throw new RuntimeException(
+                        "Couldn't do LocalRearange on tuple: " + t, e);
+            }
+        }
+
+    }
+
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java?rev=1796639&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java Mon May 29 15:00:39 2017
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.rdd.RDD;
+
+
+public class MergeCogroupConverter implements RDDConverter<Tuple, Tuple, POMergeCogroup> {
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
+                              POMergeCogroup physicalOperator) {
+        SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
+        RDD<Tuple> rdd = predecessors.get(0);
+        MergeCogroupFunction mergeCogroupFunction = new MergeCogroupFunction(physicalOperator);
+        return rdd.toJavaRDD().mapPartitions(mergeCogroupFunction, true).rdd();
+    }
+
+    private static class MergeCogroupFunction implements
+            FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
+
+        private POMergeCogroup poMergeCogroup;
+
+        @Override
+        public Iterable<Tuple> call(final Iterator<Tuple> input) throws Exception {
+            return new Iterable<Tuple>() {
+
+                @Override
+                public Iterator<Tuple> iterator() {
+                    return new OutputConsumerIterator(input) {
+
+                        @Override
+                        protected void attach(Tuple tuple) {
+                            poMergeCogroup.setInputs(null);
+                            poMergeCogroup.attachInput(tuple);
+                        }
+
+                        @Override
+                        protected Result getNextResult() throws ExecException {
+                            return poMergeCogroup.getNextTuple();
+                        }
+
+                        @Override
+                        protected void endOfInput() {
+                            poMergeCogroup.setEndOfInput(true);
+                        }
+                    };
+                }
+            };
+        }
+
+        private MergeCogroupFunction(POMergeCogroup poMergeCogroup) {
+            this.poMergeCogroup = poMergeCogroup;
+        }
+    }
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java?rev=1796639&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java Mon May 29 15:00:39 2017
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.rdd.RDD;
+
+
+@SuppressWarnings("serial")
+public class MergeJoinConverter implements
+        RDDConverter<Tuple, Tuple, POMergeJoin> {
+
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
+                              POMergeJoin poMergeJoin) throws IOException {
+
+        SparkUtil.assertPredecessorSize(predecessors, poMergeJoin, 1);
+        RDD<Tuple> rdd = predecessors.get(0);
+        MergeJoinFunction mergeJoinFunction = new MergeJoinFunction(poMergeJoin);
+
+        return rdd.toJavaRDD().mapPartitions(mergeJoinFunction, true).rdd();
+    }
+
+    private static class MergeJoinFunction implements
+            FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
+
+        private POMergeJoin poMergeJoin;
+
+        private MergeJoinFunction(POMergeJoin poMergeJoin) {
+            this.poMergeJoin = poMergeJoin;
+        }
+
+        public Iterable<Tuple> call(final Iterator<Tuple> input) {
+
+            return new Iterable<Tuple>() {
+                @Override
+                public Iterator<Tuple> iterator() {
+                    return new OutputConsumerIterator(input) {
+
+                        @Override
+                        protected void attach(Tuple tuple) {
+                            poMergeJoin.setInputs(null);
+                            poMergeJoin.attachInput(tuple);
+                        }
+
+                        @Override
+                        protected Result getNextResult() throws ExecException {
+                            return poMergeJoin.getNextTuple();
+                        }
+
+                        @Override
+                        protected void endOfInput() {
+                            poMergeJoin.setEndOfInput(true);
+                        }
+                    };
+                }
+            };
+        }
+    }
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java?rev=1796639&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java Mon May 29 15:00:39 2017
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.Tuple;
+
+abstract class OutputConsumerIterator implements java.util.Iterator<Tuple> {
+    private final java.util.Iterator<Tuple> input;
+    private Result result = null;
+    private boolean returned = true;
+    private boolean done = false;
+
+    OutputConsumerIterator(java.util.Iterator<Tuple> input) {
+        this.input = input;
+    }
+
+    abstract protected void attach(Tuple tuple);
+
+    abstract protected Result getNextResult() throws ExecException;
+
+    /**
+     * Certain operators may buffer the output.
+     * We need to flush the last set of records from such operators,
+     * when we encounter the last input record, before calling
+     * getNextTuple() for the last time.
+     */
+    abstract protected void endOfInput();
+
+    private void readNext() {
+        while (true) {
+            try {
+                // result is set in hasNext() call and returned
+                // to the user in next() call
+                if (result != null && !returned) {
+                    return;
+                }
+
+                if (result == null) {
+                    if (!input.hasNext()) {
+                        done = true;
+                        return;
+                    }
+                    Tuple v1 = input.next();
+                    attach(v1);
+                }
+
+                if (!input.hasNext()) {
+                    endOfInput();
+                }
+
+                result = getNextResult();
+                returned = false;
+                switch (result.returnStatus) {
+                    case POStatus.STATUS_OK:
+                        returned = false;
+                        break;
+                    case POStatus.STATUS_NULL:
+                        returned = true;
+                        break;
+                    case POStatus.STATUS_EOP:
+                        done = !input.hasNext();
+                        if (!done) {
+                            result = null;
+                        }
+                        break;
+                    case POStatus.STATUS_ERR:
+                        throw new RuntimeException("Error while processing " + result);
+                }
+
+            } catch (ExecException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @Override
+    public boolean hasNext() {
+        readNext();
+        return !done;
+    }
+
+    @Override
+    public Tuple next() {
+        readNext();
+        if (done) {
+            throw new RuntimeException("Past the end. Call hasNext() before calling next()");
+        }
+        if (result == null || result.returnStatus != POStatus.STATUS_OK) {
+            throw new RuntimeException("Unexpected response code from operator: "
+                    + result);
+        }
+        returned = true;
+        return (Tuple) result.result;
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException();
+    }
+}
\ No newline at end of file



Mime
View raw message