Return-Path: X-Original-To: apmail-pig-commits-archive@www.apache.org Delivered-To: apmail-pig-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8BA711815C for ; Thu, 18 Feb 2016 02:43:46 +0000 (UTC) Received: (qmail 36636 invoked by uid 500); 18 Feb 2016 02:43:46 -0000 Delivered-To: apmail-pig-commits-archive@pig.apache.org Received: (qmail 36600 invoked by uid 500); 18 Feb 2016 02:43:46 -0000 Mailing-List: contact commits-help@pig.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pig.apache.org Delivered-To: mailing list commits@pig.apache.org Received: (qmail 36588 invoked by uid 99); 18 Feb 2016 02:43:46 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Feb 2016 02:43:46 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id DE2CD1A0908 for ; Thu, 18 Feb 2016 02:43:45 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.471 X-Spam-Level: * X-Spam-Status: No, score=1.471 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-0.329] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id PTeiD1_wgyYx for ; Thu, 18 Feb 2016 02:43:41 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with ESMTP id 769375F1EB for ; Thu, 18 Feb 2016 02:43:41 +0000 (UTC) Received: from svn01-us-west.apache.org (svn.apache.org [10.41.0.6]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id B626EE0111 for ; Thu, 18 Feb 2016 02:43:40 +0000 (UTC) Received: from svn01-us-west.apache.org (localhost [127.0.0.1]) by svn01-us-west.apache.org (ASF Mail Server at svn01-us-west.apache.org) with ESMTP id 912A93A0249 for ; Thu, 18 Feb 2016 02:43:40 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1730995 - in /pig/branches/spark: src/org/apache/pig/backend/hadoop/executionengine/spark/ src/org/apache/pig/backend/hadoop/executionengine/spark/plan/ src/org/apache/pig/backend/hadoop/executionengine/spark/running/ test/org/apache/pig/t... Date: Thu, 18 Feb 2016 02:43:40 -0000 To: commits@pig.apache.org From: xuefu@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20160218024340.912A93A0249@svn01-us-west.apache.org> Author: xuefu Date: Thu Feb 18 02:43:40 2016 New Revision: 1730995 URL: http://svn.apache.org/viewvc?rev=1730995&view=rev Log: PIG-4601: Implement Merge CoGroup for Spark engine (Liyun via Xuefu) Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1730995&r1=1730994&r2=1730995&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Thu Feb 18 02:43:40 2016 @@ -55,6 +55,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange; @@ -76,6 +77,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter; import org.apache.pig.backend.hadoop.executionengine.spark.converter.LoadConverter; import org.apache.pig.backend.hadoop.executionengine.spark.converter.LocalRearrangeConverter; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.MergeCogroupConverter; import org.apache.pig.backend.hadoop.executionengine.spark.converter.MergeJoinConverter; import org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter; import org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter; @@ -202,6 +204,7 @@ public class SparkLauncher extends Launc convertMap.put(PORank.class, new RankConverter()); convertMap.put(POStream.class, new StreamConverter(confBytes)); convertMap.put(POFRJoin.class, new FRJoinConverter()); + convertMap.put(POMergeCogroup.class, new MergeCogroupConverter()); convertMap.put(POReduceBySpark.class, new ReduceByConverter()); convertMap.put(POPreCombinerLocalRearrange.class, new LocalRearrangeConverter()); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java?rev=1730995&r1=1730994&r2=1730995&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java Thu Feb 18 02:43:40 2016 @@ -18,29 +18,38 @@ package org.apache.pig.backend.hadoop.executionengine.spark; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Random; -import java.util.UUID; + +import scala.Product2; +import scala.Tuple2; +import scala.collection.JavaConversions; +import scala.collection.Seq; +import scala.reflect.ClassTag; +import scala.reflect.ClassTag$; import org.apache.hadoop.mapred.JobConf; +import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; +import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.plan.NodeIdGenerator; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.PlanException; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.UDFContext; import org.apache.spark.HashPartitioner; import org.apache.spark.Partitioner; import org.apache.spark.rdd.RDD; -import scala.Product2; -import scala.Tuple2; -import scala.collection.JavaConversions; -import scala.collection.Seq; -import scala.reflect.ClassTag; -import scala.reflect.ClassTag$; - public class SparkUtil { public static ClassTag getManifest(Class clazz) { @@ -119,4 +128,25 @@ public class SparkUtil { return new MapReducePartitionerWrapper(customPartitioner, parallelism); } } -} + + // createIndexerSparkNode is a utility to create an indexer spark node with baseSparkOp + static public void createIndexerSparkNode(SparkOperator baseSparkOp, String scope, NodeIdGenerator nig) throws PlanException, ExecException { + List eps = new ArrayList(); + PhysicalPlan ep = new PhysicalPlan(); + POProject prj = new POProject(new OperatorKey(scope, + nig.getNextNodeId(scope))); + prj.setStar(true); + prj.setOverloaded(false); + prj.setResultType(DataType.TUPLE); + ep.add(prj); + eps.add(ep); + + List ascCol = new ArrayList(); + ascCol.add(true); + + int requestedParallelism = baseSparkOp.requestedParallelism; + POSort sort = new POSort(new OperatorKey(scope, nig.getNextNodeId(scope)), requestedParallelism, null, eps, ascCol, null); + //POSort is added to sort the index tuples genereated by MergeJoinIndexer.More detail, see PIG-4601 + baseSparkOp.physicalPlan.addAsLeaf(sort); + } +} \ No newline at end of file Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1730995&r1=1730994&r2=1730995&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java Thu Feb 18 02:43:40 2016 @@ -18,6 +18,7 @@ package org.apache.pig.backend.hadoop.executionengine.spark.plan; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -30,10 +31,16 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.pig.CollectableLoadFunc; +import org.apache.pig.FuncSpec; +import org.apache.pig.IndexableLoadFunc; import org.apache.pig.LoadFunc; +import org.apache.pig.OrderedLoadFunc; import org.apache.pig.PigException; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; @@ -47,6 +54,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; @@ -60,11 +68,11 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; -import org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator; import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark; -import org.apache.pig.builtin.LOG; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.plan.DepthFirstWalker; import org.apache.pig.impl.plan.NodeIdGenerator; @@ -73,6 +81,8 @@ import org.apache.pig.impl.plan.Operator import org.apache.pig.impl.plan.OperatorPlan; import org.apache.pig.impl.plan.PlanException; import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.impl.util.ObjectSerializer; +import org.apache.pig.impl.util.Utils; /** * The compiler that compiles a given physical physicalPlan into a DAG of Spark @@ -801,4 +811,173 @@ public class SparkCompiler extends PhyPl finPlan.merge(e); } } + + @Override + public void visitMergeCoGroup(POMergeCogroup poCoGrp) throws VisitorException { + if (compiledInputs.length < 2) { + int errCode = 2251; + String errMsg = "Merge Cogroup work on two or more relations." + + "To use map-side group-by on single relation, use 'collected' qualifier."; + throw new SparkCompilerException(errMsg, errCode); + } + + List funcSpecs = new ArrayList(compiledInputs.length - 1); + List fileSpecs = new ArrayList(compiledInputs.length - 1); + List loaderSigns = new ArrayList(compiledInputs.length - 1); + + try { + poCoGrp.setEndOfRecordMark(POStatus.STATUS_NULL); + + // Iterate through all the SparkOpererators, disconnect side SparkOperators from + // SparkOperator and collect all the information needed in different lists. + + for (int i = 0; i < compiledInputs.length; i++) { + SparkOperator sparkOper = compiledInputs[i]; + PhysicalPlan plan = sparkOper.physicalPlan; + if (plan.getRoots().size() != 1) { + int errCode = 2171; + String errMsg = "Expected one but found more then one root physical operator in physical plan."; + throw new SparkCompilerException(errMsg, errCode, PigException.BUG); + } + + PhysicalOperator rootPOOp = plan.getRoots().get(0); + if (!(rootPOOp instanceof POLoad)) { + int errCode = 2172; + String errMsg = "Expected physical operator at root to be POLoad. Found : " + rootPOOp.getClass().getCanonicalName(); + throw new SparkCompilerException(errMsg, errCode); + } + + POLoad sideLoader = (POLoad) rootPOOp; + FileSpec loadFileSpec = sideLoader.getLFile(); + FuncSpec funcSpec = loadFileSpec.getFuncSpec(); + LoadFunc loadfunc = sideLoader.getLoadFunc(); + if (i == 0) { + + if (!(CollectableLoadFunc.class.isAssignableFrom(loadfunc.getClass()))) { + int errCode = 2252; + throw new SparkCompilerException("Base loader in Cogroup must implement CollectableLoadFunc.", errCode); + } + + ((CollectableLoadFunc) loadfunc).ensureAllKeyInstancesInSameSplit(); + continue; + } + if (!(IndexableLoadFunc.class.isAssignableFrom(loadfunc.getClass()))) { + int errCode = 2253; + throw new SparkCompilerException("Side loaders in cogroup must implement IndexableLoadFunc.", errCode); + } + + funcSpecs.add(funcSpec); + fileSpecs.add(loadFileSpec.getFileName()); + loaderSigns.add(sideLoader.getSignature()); + sparkPlan.remove(sparkOper); + } + + poCoGrp.setSideLoadFuncs(funcSpecs); + poCoGrp.setSideFileSpecs(fileSpecs); + poCoGrp.setLoaderSignatures(loaderSigns); + + // Use spark operator of base relation for the cogroup operation. + SparkOperator baseSparkOp = phyToSparkOpMap.get(poCoGrp.getInputs().get(0)); + + // Create a spark operator to generate index file for tuples from leftmost relation + SparkOperator indexerSparkOp = getSparkOp(); + FileSpec idxFileSpec = getIndexingJob(indexerSparkOp, baseSparkOp, poCoGrp.getLRInnerPlansOf(0)); + poCoGrp.setIdxFuncSpec(idxFileSpec.getFuncSpec()); + poCoGrp.setIndexFileName(idxFileSpec.getFileName()); + + baseSparkOp.physicalPlan.addAsLeaf(poCoGrp); + for (FuncSpec funcSpec : funcSpecs) + baseSparkOp.UDFs.add(funcSpec.toString()); + + sparkPlan.add(indexerSparkOp); + sparkPlan.connect(indexerSparkOp, baseSparkOp); + phyToSparkOpMap.put(poCoGrp, baseSparkOp); + curSparkOp = baseSparkOp; + } catch (ExecException e) { + throw new SparkCompilerException(e.getDetailedMessage(), e.getErrorCode(), e.getErrorSource(), e); + } catch (SparkCompilerException mrce) { + throw (mrce); + } catch (CloneNotSupportedException e) { + throw new SparkCompilerException(e); + } catch (PlanException e) { + int errCode = 2034; + String msg = "Error compiling operator " + poCoGrp.getClass().getCanonicalName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } catch (IOException e) { + int errCode = 3000; + String errMsg = "IOException caught while compiling POMergeCoGroup"; + throw new SparkCompilerException(errMsg, errCode, e); + } + } + + // Sets up the indexing job for single-stage cogroups. + private FileSpec getIndexingJob(SparkOperator indexerSparkOp, + final SparkOperator baseSparkOp, final List mapperLRInnerPlans) + throws SparkCompilerException, PlanException, ExecException, IOException, CloneNotSupportedException { + + // First replace loader with MergeJoinIndexer. + PhysicalPlan baseMapPlan = baseSparkOp.physicalPlan; + POLoad baseLoader = (POLoad) baseMapPlan.getRoots().get(0); + FileSpec origLoaderFileSpec = baseLoader.getLFile(); + FuncSpec funcSpec = origLoaderFileSpec.getFuncSpec(); + LoadFunc loadFunc = baseLoader.getLoadFunc(); + + if (!(OrderedLoadFunc.class.isAssignableFrom(loadFunc.getClass()))) { + int errCode = 1104; + String errMsg = "Base relation of merge-coGroup must implement " + + "OrderedLoadFunc interface. The specified loader " + + funcSpec + " doesn't implement it"; + throw new SparkCompilerException(errMsg, errCode); + } + + String[] indexerArgs = new String[6]; + indexerArgs[0] = funcSpec.toString(); + indexerArgs[1] = ObjectSerializer.serialize((Serializable) mapperLRInnerPlans); + indexerArgs[3] = baseLoader.getSignature(); + indexerArgs[4] = baseLoader.getOperatorKey().scope; + indexerArgs[5] = Boolean.toString(false); // we care for nulls. + + PhysicalPlan phyPlan; + if (baseMapPlan.getSuccessors(baseLoader) == null + || baseMapPlan.getSuccessors(baseLoader).isEmpty()) { + // Load-Load-Cogroup case. + phyPlan = null; + } else { // We got something. Yank it and set it as inner plan. + phyPlan = baseMapPlan.clone(); + PhysicalOperator root = phyPlan.getRoots().get(0); + phyPlan.disconnect(root, phyPlan.getSuccessors(root).get(0)); + phyPlan.remove(root); + + } + indexerArgs[2] = ObjectSerializer.serialize(phyPlan); + + POLoad idxJobLoader = getLoad(); + idxJobLoader.setLFile(new FileSpec(origLoaderFileSpec.getFileName(), + new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs))); + indexerSparkOp.physicalPlan.add(idxJobLoader); + indexerSparkOp.UDFs.add(baseLoader.getLFile().getFuncSpec().toString()); + + // Loader of sparkOp will return a tuple of form - + // (key1, key2, .. , WritableComparable, splitIndex). See MergeJoinIndexer for details. + // Create a spark node to retrieve index file by MergeJoinIndexer + SparkUtil.createIndexerSparkNode(indexerSparkOp, scope, nig); + + POStore st = getStore(); + FileSpec strFile = getTempFileSpec(); + st.setSFile(strFile); + indexerSparkOp.physicalPlan.addAsLeaf(st); + + return strFile; + } + + /** + * Returns a temporary DFS Path + * + * @return + * @throws IOException + */ + private FileSpec getTempFileSpec() throws IOException { + return new FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(), + new FuncSpec(Utils.getTmpFileCompressorName(pigContext))); + } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java?rev=1730995&r1=1730994&r2=1730995&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java Thu Feb 18 02:43:40 2016 @@ -22,13 +22,17 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.data.Tuple; +import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.util.UDFContext; import org.apache.pig.tools.pigstats.PigStatusReporter; @@ -40,12 +44,18 @@ public class PigInputFormatSpark extends InterruptedException { init(); resetUDFContext(); - RecordReader recordReader = super.createRecordReader(split, context); //PigSplit#conf is the default hadoop configuration, we need get the configuration //from context.getConfigration() to retrieve pig properties PigSplit pigSplit = (PigSplit) split; - pigSplit.setConf(context.getConfiguration()); - return recordReader; + Configuration conf = context.getConfiguration(); + pigSplit.setConf(conf); + //Set current splitIndex in PigMapReduce.sJobContext.getConfiguration.get(PigImplConstants.PIG_SPLIT_INDEX) + //which will be used in POMergeCogroup#setup + if (PigMapReduce.sJobContext == null) { + PigMapReduce.sJobContext = HadoopShims.createJobContext(conf, new JobID()); + } + PigMapReduce.sJobContext.getConfiguration().setInt(PigImplConstants.PIG_SPLIT_INDEX, pigSplit.getSplitIndex()); + return super.createRecordReader(split, context); } private void resetUDFContext() { Modified: pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java?rev=1730995&r1=1730994&r2=1730995&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java Thu Feb 18 02:43:40 2016 @@ -311,12 +311,11 @@ public class TestMapSideCogroup { pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "," + INPUT_FILE4 + "' using "+ DummyCollectableLoader.class.getName() +"() as (c1:chararray,c2:int);"); pigServer.registerQuery("B = LOAD '" + INPUT_FILE5 + "' using "+ DummyIndexableLoader.class.getName() +"() as (c1:chararray,c2:int);"); - DataBag dbMergeCogrp = BagFactory.getInstance().newDefaultBag(); + List dbMergeCogrp = new ArrayList(); pigServer.registerQuery("C = cogroup A by c1, B by c1 using 'merge';"); Iterator iter = pigServer.openIterator("C"); - while(iter.hasNext()) { Tuple t = iter.next(); dbMergeCogrp.add(t); @@ -335,12 +334,29 @@ public class TestMapSideCogroup { "(3,{(3,3),(3,2),(3,1)},{(3,1),(3,2),(3,3)})" }; - assertEquals(9, dbMergeCogrp.size()); + List expected = Util.getTuplesFromConstantTupleStrings(results); + + //We need sort dbMergeCogrp because the result is different in sequence between spark and other mode when + //multiple files are loaded(LOAD INPUT_FILE1,INPUT_FILE4...) + for (Tuple t : dbMergeCogrp) { + Util.convertBagToSortedBag(t); + } + for (Tuple t : expected) { + Util.convertBagToSortedBag(t); + } + + Collections.sort(dbMergeCogrp); + Collections.sort(expected); + assertEquals(dbMergeCogrp.size(), expected.size()); + + //Since TestMapSideCogroup.DummyIndexableLoader.getNext() does not + //apply schema for each input tuple,Util#checkQueryOutputsAfterSortRecursive fails to assert. + // The schema for C is (int,{(chararray,int),(chararray,int),(chararray,int)},{(chararray,int),(chararray,int),(chararray,int)}). + //But the schema for result "dbMergeCogrp" is (int,{(chararray,int),(chararray,int),(chararray,int)},{(chararray,chararray),(chararray,chararray),(chararray,chararray)}) Iterator itr = dbMergeCogrp.iterator(); - for(int i=0; i<9; i++){ - assertEquals(itr.next().toString(), results[i]); + for (int i = 0; i < dbMergeCogrp.size(); i++) { + assertEquals(itr.next().toString(), expected.get(i).toString()); } - assertFalse(itr.hasNext()); } @Test