Return-Path: Delivered-To: apmail-hadoop-pig-commits-archive@www.apache.org Received: (qmail 5544 invoked from network); 8 Feb 2010 19:20:02 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 8 Feb 2010 19:20:02 -0000 Received: (qmail 97820 invoked by uid 500); 8 Feb 2010 19:20:02 -0000 Delivered-To: apmail-hadoop-pig-commits-archive@hadoop.apache.org Received: (qmail 97775 invoked by uid 500); 8 Feb 2010 19:20:02 -0000 Mailing-List: contact pig-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: pig-dev@hadoop.apache.org Delivered-To: mailing list pig-commits@hadoop.apache.org Received: (qmail 97766 invoked by uid 500); 8 Feb 2010 19:20:02 -0000 Delivered-To: apmail-incubator-pig-commits@incubator.apache.org Received: (qmail 97763 invoked by uid 99); 8 Feb 2010 19:20:02 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Feb 2010 19:20:02 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Feb 2010 19:19:59 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 721CB2388905; Mon, 8 Feb 2010 19:19:38 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r907760 - in /hadoop/pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ Date: Mon, 08 Feb 2010 19:19:38 -0000 To: pig-commits@incubator.apache.org From: hashutosh@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100208191938.721CB2388905@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: hashutosh Date: Mon Feb 8 19:19:37 2010 New Revision: 907760 URL: http://svn.apache.org/viewvc?rev=907760&view=rev Log: PIG-1224: Collected group should change to use new (internal) bag Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=907760&r1=907759&r2=907760&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Mon Feb 8 19:19:37 2010 @@ -24,6 +24,8 @@ IMPROVEMENTS +PIG-1224: Collected group should change to use new (internal) bag (ashutoshc) + PIG-1046: join algorithm specification is within double quotes (ashutoshc) PIG-1209: Port POJoinPackage to proactively spill (ashutoshc) Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java?rev=907760&r1=907759&r2=907760&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java Mon Feb 8 19:19:37 2010 @@ -22,6 +22,7 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; @@ -31,6 +32,7 @@ import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; +import org.apache.pig.data.InternalCachedBag; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.io.PigNullableWritable; @@ -69,6 +71,8 @@ private Object prevKey = null; + private boolean useDefaultBag = false; + public POCollectedGroup(OperatorKey k) { this(k, -1, null); } @@ -199,8 +203,24 @@ // the first time, just create a new buffer and continue. if (prevKey == null && outputBag == null) { + + if (PigMapReduce.sJobConf != null) { + String bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type"); + if (bagType != null && bagType.equalsIgnoreCase("default")) { + useDefaultBag = true; + } + } prevKey = curKey; - outputBag = BagFactory.getInstance().newDefaultBag(); + outputBag = useDefaultBag ? BagFactory.getInstance().newDefaultBag() + // In a very rare case if there is a POStream after this + // POCollectedGroup in the pipeline and is also blocking the pipeline; + // constructor argument should be 2. But for one obscure + // case we don't want to pay the penalty all the time. + + // Additionally, if there is a merge join(on a different key) following POCollectedGroup + // default bags should be used. But since we don't allow anything + // before Merge Join currently we are good. + : new InternalCachedBag(1); outputBag.add((Tuple)tup.get(1)); continue; } @@ -224,7 +244,8 @@ res.result = tup2; prevKey = curKey; - outputBag = BagFactory.getInstance().newDefaultBag(); + outputBag = useDefaultBag ? BagFactory.getInstance().newDefaultBag() + : new InternalCachedBag(1); outputBag.add((Tuple)tup.get(1)); return res; Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java?rev=907760&r1=907759&r2=907760&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java Mon Feb 8 19:19:37 2010 @@ -25,6 +25,7 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; import org.apache.pig.data.InternalCachedBag; @@ -44,6 +45,8 @@ private boolean lastInputTuple = false; private static final Tuple t1 = null; private static final Result eopResult = new Result(POStatus.STATUS_EOP, null); + private boolean firstTime = true; + private boolean useDefaultBag = false; public static final String DEFAULT_CHUNK_SIZE = "1000"; @@ -100,6 +103,16 @@ */ @Override public Result getNext(Tuple t) throws ExecException { + + if(firstTime){ + firstTime = false; + if (PigMapReduce.sJobConf != null) { + String bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type"); + if (bagType != null && bagType.equalsIgnoreCase("default")) { + useDefaultBag = true; + } + } + } // if a previous call to foreach.getNext() // has still not returned all output, process it if (forEach.processingPlan) @@ -126,17 +139,14 @@ { lastInputTuple = false; //Put n-1 inputs into bags - String bagType = null; - if (PigMapReduce.sJobConf != null) { - bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type"); - } dbs = new DataBag[numInputs]; for (int i = 0; i < numInputs; i++) { - if (bagType != null && bagType.equalsIgnoreCase("default")) { - dbs[i] = mBagFactory.newDefaultBag(); - } else { - dbs[i] = new InternalCachedBag(numInputs); - } + dbs[i] = useDefaultBag ? BagFactory.getInstance().newDefaultBag() + // In a very rare case if there is a POStream after this + // POJoinPackage in the pipeline and is also blocking the pipeline; + // constructor argument should be 2 * numInputs. But for one obscure + // case we don't want to pay the penalty all the time. + : new InternalCachedBag(numInputs); } //For each Nullable tuple in the input, put it Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=907760&r1=907759&r2=907760&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Mon Feb 8 19:19:37 2010 @@ -115,6 +115,10 @@ protected static final BagFactory mBagFactory = BagFactory.getInstance(); protected static final TupleFactory mTupleFactory = TupleFactory.getInstance(); + + private boolean firstTime = true; + + private boolean useDefaultBag = false; public POPackage(OperatorKey k) { this(k, -1, null); @@ -211,6 +215,17 @@ @Override public Result getNext(Tuple t) throws ExecException { Tuple res; + + if(firstTime){ + firstTime = false; + if (PigMapReduce.sJobConf != null) { + String bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type"); + if (bagType != null && bagType.equalsIgnoreCase("default")) { + useDefaultBag = true; + } + } + } + if(distinct) { // only set the key which has the whole // tuple @@ -232,20 +247,14 @@ } else { // create bag to pull all tuples out of iterator - String bagType = null; - if (PigMapReduce.sJobConf != null) { - bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type"); - } - - - for (int i = 0; i < numInputs; i++) { - if (bagType != null && bagType.equalsIgnoreCase("default")) { - dbs[i] = mBagFactory.newDefaultBag(); - } else { - dbs[i] = new InternalCachedBag(numInputs); - } - } - + for (int i = 0; i < numInputs; i++) { + dbs[i] = useDefaultBag ? BagFactory.getInstance().newDefaultBag() + // In a very rare case if there is a POStream after this + // POPackage in the pipeline and is also blocking the pipeline; + // constructor argument should be 2 * numInputs. But for one obscure + // case we don't want to pay the penalty all the time. + : new InternalCachedBag(numInputs); + } //For each indexed tup in the inp, sort them //into their corresponding bags based //on the index