Return-Path: X-Original-To: apmail-pig-commits-archive@www.apache.org Delivered-To: apmail-pig-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1566A18EBD for ; Wed, 13 Jan 2016 18:44:58 +0000 (UTC) Received: (qmail 16407 invoked by uid 500); 13 Jan 2016 18:44:57 -0000 Delivered-To: apmail-pig-commits-archive@pig.apache.org Received: (qmail 16373 invoked by uid 500); 13 Jan 2016 18:44:57 -0000 Mailing-List: contact commits-help@pig.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pig.apache.org Delivered-To: mailing list commits@pig.apache.org Received: (qmail 16364 invoked by uid 99); 13 Jan 2016 18:44:57 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Jan 2016 18:44:57 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 5BF24C1796 for ; Wed, 13 Jan 2016 18:44:57 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.246 X-Spam-Level: * X-Spam-Status: No, score=1.246 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-0.554] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id HSNUrzumEwtl for ; Wed, 13 Jan 2016 18:44:55 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTP id 9AF702306B for ; Wed, 13 Jan 2016 18:44:55 +0000 (UTC) Received: from svn01-us-west.apache.org (svn.apache.org [10.41.0.6]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 13617E0760 for ; Wed, 13 Jan 2016 18:44:55 +0000 (UTC) Received: from svn01-us-west.apache.org (localhost [127.0.0.1]) by svn01-us-west.apache.org (ASF Mail Server at svn01-us-west.apache.org) with ESMTP id 115A83A01DB for ; Wed, 13 Jan 2016 18:44:55 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1724477 - in /pig/trunk/src/org/apache/pig/backend/hadoop/executionengine: physicalLayer/ physicalLayer/relationalOperators/ tez/plan/operator/ Date: Wed, 13 Jan 2016 18:44:54 -0000 To: commits@pig.apache.org From: rohini@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20160113184455.115A83A01DB@svn01-us-west.apache.org> Author: rohini Date: Wed Jan 13 18:44:54 2016 New Revision: 1724477 URL: http://svn.apache.org/viewvc?rev=1724477&view=rev Log: Fix test failures for PIG-4737 Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1724477&r1=1724476&r2=1724477&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Wed Jan 13 18:44:54 2016 @@ -407,7 +407,7 @@ public abstract class PhysicalOperator e public Result getNextDataBag() throws ExecException { Result val = new Result(); - DataBag tmpBag = BagFactory.getInstance().newDefaultBag(); + DataBag tmpBag = mBagFactory.newDefaultBag(); for (Result ret = getNextTuple(); ret.returnStatus != POStatus.STATUS_EOP; ret = getNextTuple()) { if (ret.returnStatus == POStatus.STATUS_ERR) { return ret; Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1724477&r1=1724476&r2=1724477&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java Wed Jan 13 18:44:54 2016 @@ -238,19 +238,17 @@ public class POPartialAgg extends Physic doSpill = false; doContingentSpill = false; } - if (result.returnStatus != POStatus.STATUS_EOP - || inputsExhausted) { + if (result.returnStatus != POStatus.STATUS_EOP) { + return result; + } else if (inputsExhausted) { + freeMemory(); return result; } } if (mapAggDisabled()) { // disableMapAgg() sets doSpill, so we can't get here while there is still contents in the buffered maps. // if we get to this point, everything is flushed, so we can simply return the raw tuples from now on. - if (rawInputMap != null) { - // Free up the maps for garbage collection - rawInputMap = null; - processedInputMap = null; - } + freeMemory(); return processInput(); } else { Result inp = processInput(); @@ -292,6 +290,15 @@ public class POPartialAgg extends Physic } } + private void freeMemory() throws ExecException { + if (rawInputMap != null && !rawInputMap.isEmpty()) { + throw new ExecException("Illegal state. Trying to free up partial aggregation maps when they are not empty"); + } + // Free up the maps for garbage collection + rawInputMap = null; + processedInputMap = null; + } + private void estimateMemThresholds() { if (!mapAggDisabled()) { LOG.info("Getting mem limits; considering " + ALL_POPARTS.size() Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=1724477&r1=1724476&r2=1724477&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java Wed Jan 13 18:44:54 2016 @@ -254,10 +254,10 @@ public class POSort extends PhysicalOper @Override public Result getNextTuple() throws ExecException { - Result res = new Result(); + Result inp; if (!inputsAccumulated) { - res = processInput(); + inp = processInput(); if (!initialized) { initialized = true; if (PigMapReduce.sJobConfInternal.get() != null) { @@ -272,23 +272,25 @@ public class POSort extends PhysicalOper sortedBag = useDefaultBag ? mBagFactory.newSortedBag(mComparator) : new InternalSortedBag(3, mComparator); - while (res.returnStatus != POStatus.STATUS_EOP) { - if (res.returnStatus == POStatus.STATUS_ERR) { + while (inp.returnStatus != POStatus.STATUS_EOP) { + if (inp.returnStatus == POStatus.STATUS_ERR) { log.error("Error in reading from the inputs"); - return res; - } else if (res.returnStatus == POStatus.STATUS_NULL) { + return inp; + } else if (inp.returnStatus == POStatus.STATUS_NULL) { // Ignore and read the next tuple. - res = processInput(); + inp = processInput(); continue; } - sortedBag.add((Tuple) res.result); - res = processInput(); + sortedBag.add((Tuple) inp.result); + inp = processInput(); } inputsAccumulated = true; } - if (it == null) { + + Result res = new Result(); + if (it == null) { it = sortedBag.iterator(); } if (it.hasNext()) { @@ -299,7 +301,7 @@ public class POSort extends PhysicalOper res.returnStatus = POStatus.STATUS_EOP; reset(); } - return res; + return res; } @Override Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java?rev=1724477&r1=1724476&r2=1724477&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java Wed Jan 13 18:44:54 2016 @@ -137,10 +137,7 @@ public class POSimpleTezLoad extends POL if (finished) { return RESULT_EOP; } - Result res = new Result(); if (!reader.next()) { - res.result = null; - res.returnStatus = POStatus.STATUS_EOP; // For certain operators (such as STREAM), we could still have some work // to do even after seeing the last input. These operators set a flag that // says all input has been sent and to run the pipeline one more time. @@ -148,15 +145,17 @@ public class POSimpleTezLoad extends POL this.parentPlan.endOfAllInput = true; } finished = true; + return RESULT_EOP; } else { + Result res = new Result(); Tuple next = (Tuple) reader.getCurrentValue(); res.result = next; res.returnStatus = POStatus.STATUS_OK; if (inputRecordCounter != null) { inputRecordCounter.increment(1); } + return res; } - return res; } catch (IOException e) { throw new ExecException(e); }