Return-Path: X-Original-To: apmail-pig-commits-archive@www.apache.org Delivered-To: apmail-pig-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1A22218BBF for ; Mon, 2 Nov 2015 19:55:17 +0000 (UTC) Received: (qmail 86514 invoked by uid 500); 2 Nov 2015 19:55:17 -0000 Delivered-To: apmail-pig-commits-archive@pig.apache.org Received: (qmail 86447 invoked by uid 500); 2 Nov 2015 19:55:16 -0000 Mailing-List: contact commits-help@pig.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pig.apache.org Delivered-To: mailing list commits@pig.apache.org Received: (qmail 86438 invoked by uid 99); 2 Nov 2015 19:55:16 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Nov 2015 19:55:16 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 5C3CBC070B for ; Mon, 2 Nov 2015 19:55:16 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.79 X-Spam-Level: * X-Spam-Status: No, score=1.79 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id RRDljksGAwZh for ; Mon, 2 Nov 2015 19:55:13 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTP id A91162139C for ; Mon, 2 Nov 2015 19:55:13 +0000 (UTC) Received: from svn01-us-west.apache.org (svn.apache.org [10.41.0.6]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 5AA1DE0256 for ; Mon, 2 Nov 2015 19:55:13 +0000 (UTC) Received: from svn01-us-west.apache.org (localhost [127.0.0.1]) by svn01-us-west.apache.org (ASF Mail Server at svn01-us-west.apache.org) with ESMTP id 585F73A019A for ; Mon, 2 Nov 2015 19:55:13 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1712130 - in /pig/branches/branch-0.15: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java test/e2e/pig/tests/nightly.conf Date: Mon, 02 Nov 2015 19:55:13 -0000 To: commits@pig.apache.org From: daijy@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20151102195513.585F73A019A@svn01-us-west.apache.org> Author: daijy Date: Mon Nov 2 19:55:12 2015 New Revision: 1712130 URL: http://svn.apache.org/viewvc?rev=1712130&view=rev Log: PIG-4707: [Pig on Tez] Streaming job hangs with pig.exec.mapPartAgg=true Modified: pig/branches/branch-0.15/CHANGES.txt pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf Modified: pig/branches/branch-0.15/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/CHANGES.txt?rev=1712130&r1=1712129&r2=1712130&view=diff ============================================================================== --- pig/branches/branch-0.15/CHANGES.txt (original) +++ pig/branches/branch-0.15/CHANGES.txt Mon Nov 2 19:55:12 2015 @@ -28,6 +28,8 @@ OPTIMIZATIONS BUG FIXES +PIG-4707: [Pig on Tez] Streaming job hangs with pig.exec.mapPartAgg=true (rohini) + PIG-4679: Performance degradation due to InputSizeReducerEstimator since PIG-3754 (daijy) PIG-4644: PORelationToExprProject.clone() is broken (erwaman via rohini) Modified: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java?rev=1712130&r1=1712129&r2=1712130&view=diff ============================================================================== --- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java (original) +++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java Mon Nov 2 19:55:12 2015 @@ -26,26 +26,27 @@ import java.util.concurrent.ArrayBlockin import java.util.concurrent.BlockingQueue; import org.apache.pig.PigException; -import org.apache.pig.impl.PigContext; -import org.apache.pig.impl.plan.OperatorKey; -import org.apache.pig.impl.plan.VisitorException; -import org.apache.pig.impl.streaming.ExecutableManager; -import org.apache.pig.impl.streaming.StreamingCommand; -import org.apache.pig.pen.util.ExampleTuple; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.fetch.FetchLauncher; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.data.Tuple; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.impl.streaming.ExecutableManager; +import org.apache.pig.impl.streaming.StreamingCommand; +import org.apache.pig.pen.util.ExampleTuple; public class POStream extends PhysicalOperator { private static final long serialVersionUID = 2L; - + private static final Result EOP_RESULT = new Result(POStatus.STATUS_EOP, null); private String executableManagerStr; // String representing ExecutableManager to use - transient private ExecutableManager executableManager; // ExecutableManager to use + transient private ExecutableManager executableManager; // ExecutableManager to use private StreamingCommand command; // Actual command to be run private Properties properties; @@ -67,7 +68,7 @@ public class POStream extends PhysicalOp */ private boolean isFetchable; - public POStream(OperatorKey k, ExecutableManager executableManager, + public POStream(OperatorKey k, ExecutableManager executableManager, StreamingCommand command, Properties properties) { super(k); this.executableManagerStr = executableManager.getClass().getName(); @@ -76,21 +77,21 @@ public class POStream extends PhysicalOp // Setup streaming-specific properties if (command.getShipFiles()) { - parseShipCacheSpecs(command.getShipSpecs(), + parseShipCacheSpecs(command.getShipSpecs(), properties, "pig.streaming.ship.files"); } - parseShipCacheSpecs(command.getCacheSpecs(), + parseShipCacheSpecs(command.getCacheSpecs(), properties, "pig.streaming.cache.files"); } - - private static void parseShipCacheSpecs(List specs, + + private static void parseShipCacheSpecs(List specs, Properties properties, String property) { - + String existingValue = properties.getProperty(property, ""); if (specs == null || specs.size() == 0) { return; } - + // Setup streaming-specific properties StringBuffer sb = new StringBuffer(); Iterator i = specs.iterator(); @@ -107,13 +108,13 @@ public class POStream extends PhysicalOp sb.append(", "); } } - properties.setProperty(property, sb.toString()); + properties.setProperty(property, sb.toString()); } public Properties getShipCacheProperties() { return properties; } - + /** * Get the {@link StreamingCommand} for this StreamSpec. * @return the {@link StreamingCommand} for this StreamSpec @@ -121,17 +122,13 @@ public class POStream extends PhysicalOp public StreamingCommand getCommand() { return command; } - - - /* (non-Javadoc) - * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator#getNext(org.apache.pig.data.Tuple) - */ + @Override public Result getNextTuple() throws ExecException { // The POStream Operator works with ExecutableManager to // send input to the streaming binary and to get output // from it. To achieve a tuple oriented behavior, two queues - // are used - one for output from the binary and one for + // are used - one for output from the binary and one for // input to the binary. In each getNext() call: // 1) If there is no more output expected from the binary, an EOP is // sent to successor @@ -141,14 +138,14 @@ public class POStream extends PhysicalOp // send input to the binary, then the next tuple from the // predecessor is got and passed to the binary try { - // if we are being called AFTER all output from the streaming + // if we are being called AFTER all output from the streaming // binary has already been sent to us then just return EOP // The "allOutputFromBinaryProcessed" flag is set when we see // an EOS (End of Stream output) from streaming binary if(allOutputFromBinaryProcessed) { - return new Result(POStatus.STATUS_EOP, null); + return EOP_RESULT; } - + // if we are here AFTER all map() calls have been completed // AND AFTER we process all possible input to be sent to the // streaming binary, then all we want to do is read output from @@ -159,19 +156,16 @@ public class POStream extends PhysicalOp // If we received EOS, it means all output // from the streaming binary has been sent to us // So we can send an EOP to the successor in - // the pipeline. Also since we are being called - // after all input from predecessor has been processed - // it means we got here from a call from close() in - // map or reduce. So once we send this EOP down, - // getNext() in POStream should never be called. So - // we don't need to set any flag noting we saw all output - // from binary + // the pipeline and also note this condition + // for future calls r = EOP_RESULT; - } else if (r.returnStatus == POStatus.STATUS_OK) + allOutputFromBinaryProcessed = true; + } else if (r.returnStatus == POStatus.STATUS_OK) { illustratorMarkup(r.result, r.result, 0); + } return(r); } - + // if we are here, we haven't consumed all input to be sent // to the streaming binary - check if we are being called // from close() on the map or reduce @@ -184,7 +178,7 @@ public class POStream extends PhysicalOp // then "initialized" will be true. If not, just // send EOP down. if(getInitialized()) { - // signal End of ALL input to the Executable Manager's + // signal End of ALL input to the Executable Manager's // Input handler thread binaryInputQueue.put(r); // note this state for future calls @@ -195,30 +189,24 @@ public class POStream extends PhysicalOp // If we received EOS, it means all output // from the streaming binary has been sent to us // So we can send an EOP to the successor in - // the pipeline. Also since we are being called - // after all input from predecessor has been processed - // it means we got here from a call from close() in - // map or reduce. So once we send this EOP down, - // getNext() in POStream should never be called. So - // we don't need to set any flag noting we saw all output - // from binary + // the pipeline and also note this condition + // for future calls r = EOP_RESULT; + allOutputFromBinaryProcessed = true; } } - + } else if(r.returnStatus == POStatus.STATUS_EOS) { // If we received EOS, it means all output // from the streaming binary has been sent to us // So we can send an EOP to the successor in - // the pipeline. Also we are being called - // from close() in map or reduce (this is so because - // only then this.parentPlan.endOfAllInput is true). - // So once we send this EOP down, getNext() in POStream - // should never be called. So we don't need to set any - // flag noting we saw all output from binary + // the pipeline and also note this condition + // for future calls r = EOP_RESULT; - } else if (r.returnStatus == POStatus.STATUS_OK) + allOutputFromBinaryProcessed = true; + } else if (r.returnStatus == POStatus.STATUS_OK) { illustratorMarkup(r.result, r.result, 0); + } return r; } else { // we are not being called from close() - so @@ -232,18 +220,19 @@ public class POStream extends PhysicalOp // for future calls r = EOP_RESULT; allOutputFromBinaryProcessed = true; - } else if (r.returnStatus == POStatus.STATUS_OK) + } else if (r.returnStatus == POStatus.STATUS_OK) { illustratorMarkup(r.result, r.result, 0); + } return r; } - + } catch(Exception e) { int errCode = 2083; String msg = "Error while trying to get next result in POStream."; throw new ExecException(msg, errCode, PigException.BUG, e); } - - + + } public synchronized boolean getInitialized() { @@ -264,13 +253,13 @@ public class POStream extends PhysicalOp Result res = binaryOutputQueue.take(); return res; } - - // check if we can write tuples to + + // check if we can write tuples to // input of the process if(binaryInputQueue.remainingCapacity() > 0) { - + Result input = processInput(); - if(input.returnStatus == POStatus.STATUS_EOP || + if(input.returnStatus == POStatus.STATUS_EOP || input.returnStatus == POStatus.STATUS_ERR) { return input; } else { @@ -278,16 +267,16 @@ public class POStream extends PhysicalOp // Only when we see the first tuple which can // be sent as input to the binary we want // to initialize the ExecutableManager and set - // up the streaming binary - this is required in + // up the streaming binary - this is required in // Unions due to a JOIN where there may never be // any input to send to the binary in one of the map // tasks - so we initialize only if we have to. // initialize the ExecutableManager once if(!initialized) { // set up the executableManager - executableManager = + executableManager = (ExecutableManager)PigContext.instantiateFuncFromSpec(executableManagerStr); - + try { executableManager.configure(this); executableManager.run(); @@ -295,22 +284,22 @@ public class POStream extends PhysicalOp int errCode = 2084; String msg = "Error while running streaming binary."; throw new ExecException(msg, errCode, PigException.BUG, ioe); - } + } initialized = true; } - + // send this input to the streaming // process binaryInputQueue.put(input); } - + } else { - + // wait for either input to be available // or output to be consumed while(binaryOutputQueue.isEmpty() && !binaryInputQueue.isEmpty()) wait(); - + } } } @@ -320,21 +309,22 @@ public class POStream extends PhysicalOp throw new ExecException(msg, errCode, PigException.BUG, e); } } - + + @Override public String toString() { return getAliasString() + "POStream" + "[" + command.toString() + "]" + " - " + mKey.toString(); } - + @Override public void visit(PhyPlanVisitor v) throws VisitorException { v.visitStream(this); - + } @Override public String name() { - return toString(); + return toString(); } @Override @@ -348,7 +338,7 @@ public class POStream extends PhysicalOp } /** - * + * */ public void finish() throws IOException { executableManager.close(); @@ -367,7 +357,7 @@ public class POStream extends PhysicalOp public BlockingQueue getBinaryOutputQueue() { return binaryOutputQueue; } - + @Override public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) { if(illustrator != null) { Modified: pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf?rev=1712130&r1=1712129&r2=1712130&view=diff ============================================================================== --- pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf (original) +++ pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf Mon Nov 2 19:55:12 2015 @@ -689,7 +689,24 @@ store c into ':OUTPATH:';\, store d into ':OUTPATH:'; #, 'java_params' => ['-Dpig.exec.mapPartAgg=true'] - }, + }, + + { + #PIG-4707 Streaming and empty input + + 'num' => 6, + 'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float); + b = group a by name; + c = foreach b generate flatten(a); + d = stream c through `cat` as (name, age, gpa); + e = filter d by name == 'nonexistent'; + SPLIT e into f if gpa > 2, g otherwise; + store f into ':OUTPATH:.1'; + store g into ':OUTPATH:.2'; + #, + 'java_params' => ['-Dpig.exec.mapPartAgg=true'] + + }, ], },