Return-Path: X-Original-To: apmail-pig-commits-archive@www.apache.org Delivered-To: apmail-pig-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8F9D610812 for ; Mon, 3 Feb 2014 03:58:06 +0000 (UTC) Received: (qmail 25942 invoked by uid 500); 3 Feb 2014 03:58:06 -0000 Delivered-To: apmail-pig-commits-archive@pig.apache.org Received: (qmail 25913 invoked by uid 500); 3 Feb 2014 03:58:04 -0000 Mailing-List: contact commits-help@pig.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pig.apache.org Delivered-To: mailing list commits@pig.apache.org Received: (qmail 25790 invoked by uid 99); 3 Feb 2014 03:58:00 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Feb 2014 03:58:00 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Feb 2014 03:57:50 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 6A5E323888E2; Mon, 3 Feb 2014 03:57:27 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1563765 [1/2] - in /pig/trunk: ./ conf/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/fetch/ ... Date: Mon, 03 Feb 2014 03:57:26 -0000 To: commits@pig.apache.org From: cheolsoo@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140203035727.6A5E323888E2@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cheolsoo Date: Mon Feb 3 03:57:25 2014 New Revision: 1563765 URL: http://svn.apache.org/r1563765 Log: PIG-3642: Direct HDFS access for small jobs (fetch) (lbendig via cheolsoo) Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchProgressableReporter.java pig/trunk/src/org/apache/pig/tools/pigstats/SimpleFetchPigStats.java pig/trunk/test/org/apache/pig/test/TestFetch.java Modified: pig/trunk/CHANGES.txt pig/trunk/conf/pig.properties pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/FixedWidthLoader.java pig/trunk/src/org/apache/pig/Main.java pig/trunk/src/org/apache/pig/PigConfiguration.java pig/trunk/src/org/apache/pig/PigServer.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java pig/trunk/src/org/apache/pig/impl/util/PropertiesUtil.java pig/trunk/src/org/apache/pig/impl/util/Utils.java pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java pig/trunk/test/e2e/pig/tests/cmdline.conf pig/trunk/test/e2e/pig/tests/negative.conf pig/trunk/test/e2e/pig/tests/nightly.conf pig/trunk/test/org/apache/pig/test/TestAssert.java pig/trunk/test/org/apache/pig/test/TestAutoLocalMode.java pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java pig/trunk/test/org/apache/pig/test/TestPigRunner.java pig/trunk/test/org/apache/pig/test/TestPruneColumn.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1563765&r1=1563764&r2=1563765&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Mon Feb 3 03:57:25 2014 @@ -30,6 +30,8 @@ PIG-2207: Support custom counters for ag IMPROVEMENTS +PIG-3642: Direct HDFS access for small jobs (fetch) (lbendig via cheolsoo) + PIG-3730: Performance issue in SelfSpillBag (rajesh.balamohan via rohini) PIG-3654: Add class cache to PigContext (tmwoodruff via daijy) Modified: pig/trunk/conf/pig.properties URL: http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1563765&r1=1563764&r2=1563765&view=diff ============================================================================== --- pig/trunk/conf/pig.properties (original) +++ pig/trunk/conf/pig.properties Mon Feb 3 03:57:25 2014 @@ -62,6 +62,7 @@ #pig.skewedjoin.reduce.memusage=0.3 #pig.exec.nocombiner=false #opt.multiquery=true +#opt.fetch=true #Following parameters are for configuring intermediate storage format #Supported storage types are seqfile and tfile Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java?rev=1563765&r1=1563764&r2=1563765&view=diff ============================================================================== --- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java (original) +++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java Mon Feb 3 03:57:25 2014 @@ -367,7 +367,8 @@ public class CSVExcelStorage extends Pig // further records to it. If they are the same (this would // happen if multiple small files each with a header were combined // into one split), we know to skip the duplicate header record as well. - if (loadingFirstRecord && headerTreatment == Headers.SKIP_INPUT_HEADER && splitIndex == 0) { + if (loadingFirstRecord && headerTreatment == Headers.SKIP_INPUT_HEADER && + (splitIndex == 0 || splitIndex == -1)) { try { if (!in.nextKeyValue()) return null; Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/FixedWidthLoader.java URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/FixedWidthLoader.java?rev=1563765&r1=1563764&r2=1563765&view=diff ============================================================================== --- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/FixedWidthLoader.java (original) +++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/FixedWidthLoader.java Mon Feb 3 03:57:25 2014 @@ -295,7 +295,7 @@ public class FixedWidthLoader extends Lo @Override public Tuple getNext() throws IOException { - if (loadingFirstRecord && skipHeader && splitIndex == 0) { + if (loadingFirstRecord && skipHeader && (splitIndex == 0 || splitIndex == -1)) { try { if (!reader.nextKeyValue()) return null; Modified: pig/trunk/src/org/apache/pig/Main.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Main.java?rev=1563765&r1=1563764&r2=1563765&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/Main.java (original) +++ pig/trunk/src/org/apache/pig/Main.java Mon Feb 3 03:57:25 2014 @@ -208,6 +208,7 @@ public class Main { opts.registerOpt('x', "exectype", CmdLineParser.ValueExpected.REQUIRED); opts.registerOpt('F', "stop_on_failure", CmdLineParser.ValueExpected.NOT_ACCEPTED); opts.registerOpt('M', "no_multiquery", CmdLineParser.ValueExpected.NOT_ACCEPTED); + opts.registerOpt('N', "no_fetch", CmdLineParser.ValueExpected.NOT_ACCEPTED); opts.registerOpt('P', "propertyFile", CmdLineParser.ValueExpected.REQUIRED); ExecMode mode = ExecMode.UNKNOWN; @@ -300,6 +301,10 @@ public class Main { properties.setProperty("opt.multiquery",""+false); break; + case 'N': + properties.setProperty(PigConfiguration.OPT_FETCH,""+false); + break; + case 'p': params.add(opts.getValStr()); break; @@ -863,6 +868,7 @@ public class Main { System.out.println(" -x, -exectype - Set execution mode: local|mapreduce, default is mapreduce."); System.out.println(" -F, -stop_on_failure - Aborts execution on the first failed job; default is off"); System.out.println(" -M, -no_multiquery - Turn multiquery optimization off; default is on"); + System.out.println(" -N, -no_fetch - Turn fetch optimization off; default is on"); System.out.println(" -P, -propertyFile - Path to property file"); System.out.println(" -printCmdDebug - Overrides anything else and prints the actual command used to run Pig, including"); System.out.println(" any environment variables that are set by the pig command."); @@ -885,6 +891,8 @@ public class Main { System.out.println(" Only disable combiner as a temporary workaround for problems."); System.out.println(" opt.multiquery=true|false; multiquery is on by default."); System.out.println(" Only disable multiquery as a temporary workaround for problems."); + System.out.println(" opt.fetch=true|false; fetch is on by default."); + System.out.println(" Scripts containing Filter, Foreach, Limit, Stream, and Union can be dumped without MR jobs."); System.out.println(" pig.tmpfilecompression=true|false; compression is off by default."); System.out.println(" Determines whether output of intermediate jobs is compressed."); System.out.println(" pig.tmpfilecompression.codec=lzo|gzip; default is gzip."); Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1563765&r1=1563764&r2=1563765&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/PigConfiguration.java (original) +++ pig/trunk/src/org/apache/pig/PigConfiguration.java Mon Feb 3 03:57:25 2014 @@ -148,5 +148,11 @@ public class PigConfiguration { * Controls the max threshold size to convert jobs to run in local mode */ public static final String PIG_AUTO_LOCAL_INPUT_MAXBYTES = "pig.auto.local.input.maxbytes"; + + /** + * This parameter enables/disables fetching. By default it is turned on. + */ + public static final String OPT_FETCH = "opt.fetch"; + } Modified: pig/trunk/src/org/apache/pig/PigServer.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1563765&r1=1563764&r2=1563765&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/PigServer.java (original) +++ pig/trunk/src/org/apache/pig/PigServer.java Mon Feb 3 03:57:25 2014 @@ -104,6 +104,7 @@ import org.apache.pig.tools.pigstats.Out import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.tools.pigstats.PigStats.JobGraph; import org.apache.pig.tools.pigstats.ScriptState; +import org.apache.pig.tools.pigstats.SimpleFetchPigStats; /** * @@ -418,6 +419,12 @@ public class PigServer { */ protected List getJobs(PigStats stats) { LinkedList jobs = new LinkedList(); + if (stats instanceof SimpleFetchPigStats) { + HJob job = new HJob(HJob.JOB_STATUS.COMPLETED, pigContext, stats.result(null) + .getPOStore(), null); + jobs.add(job); + return jobs; + } JobGraph jGraph = stats.getJobGraph(); Iterator iter = jGraph.iterator(); while (iter.hasNext()) { Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1563765&r1=1563764&r2=1563765&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Mon Feb 3 03:57:25 2014 @@ -40,7 +40,8 @@ import org.apache.pig.backend.executione import org.apache.pig.backend.executionengine.ExecutionEngine; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.datastorage.HDataStorage; -import org.apache.pig.backend.hadoop.executionengine.Launcher; +import org.apache.pig.backend.hadoop.executionengine.fetch.FetchLauncher; +import org.apache.pig.backend.hadoop.executionengine.fetch.FetchOptimizer; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; @@ -355,6 +356,13 @@ public abstract class HExecutionEngine i try { PhysicalPlan pp = compile(lp, pc.getProperties()); + //if the compiled physical plan fulfills the requirements of the + //fetch optimizer, then further transformations / MR jobs creations are + //skipped; a SimpleFetchPigStats will be returned through which the result + //can be directly fetched from the underlying storage + if (FetchOptimizer.isPlanFetchable(pc, pp)) { + return new FetchLauncher(pc).launchPig(pp); + } return launcher.launchPig(pp, grpName, pigContext); } catch (ExecException e) { throw (ExecException) e; @@ -385,6 +393,10 @@ public abstract class HExecutionEngine i pp.explain(pps, format, verbose); MapRedUtil.checkLeafIsStore(pp, pigContext); + if (FetchOptimizer.isPlanFetchable(pc, pp)) { + new FetchLauncher(pigContext).explain(pp, pc, eps, format); + return; + } launcher.explain(pp, pigContext, eps, format, verbose); } finally { launcher.reset(); Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java?rev=1563765&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java Mon Feb 3 03:57:25 2014 @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.fetch; + +import java.io.IOException; +import java.io.PrintStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.PigException; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; +import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; +import org.apache.pig.data.SchemaTupleBackend; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.plan.DependencyOrderWalker; +import org.apache.pig.impl.plan.PlanException; +import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.impl.util.UDFContext; +import org.apache.pig.tools.pigstats.PigStats; +import org.apache.pig.tools.pigstats.SimpleFetchPigStats; +import org.joda.time.DateTimeZone; + +/** + * This class is responsible for executing the fetch task, saving the result to disk + * and do the necessary cleanup afterwards. + * + */ +public class FetchLauncher { + + private final PigContext pigContext; + private final Configuration conf; + + public FetchLauncher(PigContext pigContext) { + this.pigContext = pigContext; + this.conf = ConfigurationUtil.toConfiguration(pigContext.getProperties()); + } + + /** + * Runs the fetch task by executing chain of calls on the PhysicalPlan from the leaf + * up to the LoadFunc + * + * @param pp - Physical plan + * @return SimpleFetchPigStats instance representing the fetched result + * @throws IOException + */ + public PigStats launchPig(PhysicalPlan pp) throws IOException { + POStore poStore = (POStore) pp.getLeaves().get(0); + init(pp, poStore); + + // run fetch + runPipeline(poStore); + + UDFFinishVisitor udfFinisher = new UDFFinishVisitor(pp, + new DependencyOrderWalker(pp)); + udfFinisher.visit(); + + return PigStats.start(new SimpleFetchPigStats(pigContext, poStore)); + } + + /** + * Creates an empty MR plan + * + * @param pp - Physical plan + * @param pc - PigContext + * @param ps - PrintStream to write the plan to + * @param format format of the output plan + * @throws PlanException + * @throws VisitorException + * @throws IOException + */ + public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps, String format) + throws PlanException, VisitorException, IOException { + if ("xml".equals(format)) { + ps.println("No MR jobs. Fetch only"); + } + else { + ps.println("#--------------------------------------------------"); + ps.println("# Map Reduce Plan "); + ps.println("#--------------------------------------------------"); + ps.println("No MR jobs. Fetch only."); + } + return; + } + + private void init(PhysicalPlan pp, POStore poStore) throws IOException { + + poStore.setStoreImpl(new FetchPOStoreImpl(pigContext)); + poStore.setUp(); + if (!PlanHelper.getPhysicalOperators(pp, POStream.class).isEmpty()) { + MapRedUtil.setupStreamingDirsConfSingle(poStore, pigContext, conf); + } + + PhysicalOperator.setReporter(new FetchProgressableReporter()); + SchemaTupleBackend.initialize(conf, pigContext); + + UDFContext udfContext = UDFContext.getUDFContext(); + udfContext.addJobConf(conf); + udfContext.setClientSystemProps(pigContext.getProperties()); + udfContext.serialize(conf); + + PigMapReduce.sJobConfInternal.set(conf); + String dtzStr = PigMapReduce.sJobConfInternal.get().get("pig.datetime.default.tz"); + if (dtzStr != null && dtzStr.length() > 0) { + // ensure that the internal timezone is uniformly in UTC offset style + DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.forID(dtzStr).getOffset(null))); + } + } + + private void runPipeline(POStore posStore) throws IOException { + while (true) { + Result res = posStore.getNextTuple(); + if (res.returnStatus == POStatus.STATUS_OK) + continue; + + if (res.returnStatus == POStatus.STATUS_EOP) { + posStore.tearDown(); + return; + } + + if (res.returnStatus == POStatus.STATUS_NULL) + continue; + + if(res.returnStatus==POStatus.STATUS_ERR){ + String errMsg; + if(res.result != null) { + errMsg = "Fetch failed. Couldn't retrieve result: " + res.result; + } else { + errMsg = "Fetch failed. Couldn't retrieve result"; + } + int errCode = 2088; + ExecException ee = new ExecException(errMsg, errCode, PigException.BUG); + throw ee; + } + } + } + +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java?rev=1563765&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java Mon Feb 3 03:57:25 2014 @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.fetch; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.PigConfiguration; +import org.apache.pig.backend.datastorage.DataStorageException; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMultiQueryPackage; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POOptimizedForEach; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.builtin.SampleLoader; +import org.apache.pig.impl.plan.DepthFirstWalker; +import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.impl.util.Utils; + +/** + * FetchOptimizer determines whether the entire physical plan is fetchable, meaning + * that the task's result can be directly read (fetched) from the underlying storage + * rather than creating MR jobs. During the check {@link FetchablePlanVisitor} is used + * to walk through the plan. + * + */ +public class FetchOptimizer { + private static final Log LOG = LogFactory.getLog(FetchOptimizer.class); + + /** + * Checks whether the fetch is enabled + * + * @param pc + * @return true if fetching is enabled + */ + public static boolean isFetchEnabled(PigContext pc) { + return "true".equalsIgnoreCase( + pc.getProperties().getProperty(PigConfiguration.OPT_FETCH, "true")); + } + + /** + * Visits the plan with {@link FetchablePlanVisitor} and checks whether the + * plan is fetchable. + * + * @param pc PigContext + * @param pp the physical plan to be examined + * @return true if the plan is fetchable + * @throws VisitorException + */ + public static boolean isPlanFetchable(PigContext pc, PhysicalPlan pp) throws VisitorException { + if (isEligible(pc, pp)) { + FetchablePlanVisitor fpv = new FetchablePlanVisitor(pc, pp); + fpv.visit(); + boolean isFetchable = fpv.isPlanFetchable(); + //initialization + if (isFetchable) + init(pp); + return isFetchable; + } + return false; + } + + private static void init(PhysicalPlan pp) throws VisitorException { + //mark POStream ops 'fetchable' + LinkedList posList = PlanHelper.getPhysicalOperators(pp, POStream.class); + for (POStream pos : posList) { + pos.setFetchable(true); + } + } + + /** + * Checks whether the plan fulfills the prerequisites needed for fetching. + * + * @param pc PigContext + * @param pp the physical plan to be examined + * @return + */ + private static boolean isEligible(PigContext pc, PhysicalPlan pp) { + if (!isFetchEnabled(pc)) { + return false; + } + + List roots = pp.getRoots(); + for (PhysicalOperator po : roots) { + if (!(po instanceof POLoad)) { + String msg = "Expected physical operator at root is POLoad. Found : " + + po.getClass().getCanonicalName() + ". Fetch optimizer will be disabled."; + LOG.debug(msg); + return false; + } + } + + //consider single leaf jobs only + int leafSize = pp.getLeaves().size(); + if (pp.getLeaves().size() != 1) { + LOG.debug("Expected physical plan should have one leaf. Found " + leafSize); + return false; + } + + return true; + } + + /** + * A plan is considered 'fetchable' if: + *
+     * - it contains only: LIMIT, FILTER, FOREACH, STREAM, UNION(no implicit SPLIT is allowed)
+     * - no STORE
+     * - no scalar aliases ({@link org.apache.pig.impl.builtin.ReadScalars ReadScalars})
+     * - {@link org.apache.pig.LoadFunc LoadFunc} is not a {@link org.apache.pig.impl.builtin.SampleLoader SampleLoader}
+     * 
+ */ + private static class FetchablePlanVisitor extends PhyPlanVisitor { + + private boolean planFetchable = true; + private PigContext pc; + + public FetchablePlanVisitor(PigContext pc, PhysicalPlan plan) { + super(plan, new DepthFirstWalker(plan)); + this.pc = pc; + } + + @Override + public void visit() throws VisitorException { + new PhyPlanSetter(mPlan).visit(); + super.visit(); + } + + @Override + public void visitLoad(POLoad ld) throws VisitorException{ + if (ld.getLoadFunc() instanceof SampleLoader) { + planFetchable = false; + } + } + + @Override + public void visitStore(POStore st) throws VisitorException{ + String basePathName = st.getSFile().getFileName(); + + //plan is fetchable if POStore belongs to EXPLAIN + if ("fakefile".equals(basePathName)) { + return; + } + + //Otherwise check if target storage format equals to the intermediate storage format + //and its path points to a temporary storage path + boolean hasTmpStorageClass = st.getStoreFunc().getClass() + .equals(Utils.getTmpFileStorageClass(pc.getProperties())); + + try { + boolean hasTmpTargetPath = isTempPath(basePathName); + if (!(hasTmpStorageClass && hasTmpTargetPath)) { + planFetchable = false; + } + } + catch (IOException e) { + String msg = "Internal error. Could not retrieve temporary store location."; + throw new VisitorException(msg, e); + } + } + + @Override + public void visitNative(PONative nat) throws VisitorException { + planFetchable = false; + } + + @Override + public void visitCollectedGroup(POCollectedGroup mg) throws VisitorException { + planFetchable = false; + } + + @Override + public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException { + planFetchable = false; + } + + @Override + public void visitGlobalRearrange(POGlobalRearrange gr) throws VisitorException { + planFetchable = false; + } + + @Override + public void visitPackage(POPackage pkg) throws VisitorException { + planFetchable = false; + } + + @Override + public void visitCombinerPackage(POCombinerPackage pkg) throws VisitorException { + planFetchable = false; + } + + @Override + public void visitMultiQueryPackage(POMultiQueryPackage pkg) throws VisitorException { + planFetchable = false; + } + + @Override + public void visitSplit(POSplit spl) throws VisitorException { + planFetchable = false; + } + + @Override + public void visitDemux(PODemux demux) throws VisitorException { + planFetchable = false; + } + + @Override + public void visitCounter(POCounter poCounter) throws VisitorException { + planFetchable = false; + } + + @Override + public void visitRank(PORank rank) throws VisitorException { + planFetchable = false; + } + + @Override + public void visitDistinct(PODistinct distinct) throws VisitorException { + planFetchable = false; + } + + @Override + public void visitSort(POSort sort) throws VisitorException { + planFetchable = false; + } + + @Override + public void visitJoinPackage(POJoinPackage joinPackage) throws VisitorException { + planFetchable = false; + } + + @Override + public void visitCross(POCross cross) throws VisitorException { + planFetchable = false; + } + + @Override + public void visitFRJoin(POFRJoin join) throws VisitorException { + planFetchable = false; + } + + @Override + public void visitMergeJoin(POMergeJoin join) throws VisitorException { + planFetchable = false; + } + + @Override + public void visitMergeCoGroup(POMergeCogroup mergeCoGrp) throws VisitorException { + planFetchable = false; + } + + @Override + public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException { + planFetchable = false; + } + + @Override + public void visitPartitionRearrange(POPartitionRearrange pr) throws VisitorException { + planFetchable = false; + } + + @Override + public void visitPOOptimizedForEach(POOptimizedForEach optimizedForEach) + throws VisitorException { + planFetchable = false; + } + + @Override + public void visitPreCombinerLocalRearrange( + POPreCombinerLocalRearrange preCombinerLocalRearrange) { + planFetchable = false; + } + + @Override + public void visitPartialAgg(POPartialAgg poPartialAgg) { + planFetchable = false; + } + + private boolean isPlanFetchable() { + return planFetchable; + } + + private boolean isTempPath(String basePathName) throws DataStorageException { + String tdir = pc.getProperties().getProperty("pig.temp.dir", "/tmp"); + String tempStore = pc.getDfs().asContainer(tdir + "/temp").toString(); + Matcher matcher = Pattern.compile(tempStore + "-?[0-9]+").matcher(basePathName); + return matcher.lookingAt(); + } + } + +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java?rev=1563765&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java Mon Feb 3 03:57:25 2014 @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.fetch; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.pig.StoreFuncInterface; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl; +import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; +import org.apache.pig.impl.PigContext; + +/** + * This class is used to have a POStore write the output to the underlying storage + * via a output collector/record writer in case of a fetch task. It sets up dummy context + * objects which otherwise would be initialized by the Hadoop job itself. + */ +public class FetchPOStoreImpl extends POStoreImpl { + + private PigContext pc; + private RecordWriter writer; + private TaskAttemptContext context; + private OutputCommitter outputCommitter; + + public FetchPOStoreImpl(PigContext pc) { + this.pc = pc; + } + + @Override + public StoreFuncInterface createStoreFunc(POStore store) throws IOException { + + Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties()); + StoreFuncInterface storeFunc = store.getStoreFunc(); + JobContext jc = HadoopShims.createJobContext(conf, new JobID()); + + OutputFormat outputFormat = storeFunc.getOutputFormat(); + PigOutputFormat.setLocation(jc, store); + context = HadoopShims.createTaskAttemptContext(conf, HadoopShims.getNewTaskAttemptID()); + PigOutputFormat.setLocation(context, store); + + try { + outputFormat.checkOutputSpecs(jc); + } + catch (InterruptedException e) { + throw new IOException(e); + } + + try { + outputCommitter = outputFormat.getOutputCommitter(context); + outputCommitter.setupJob(jc); + outputCommitter.setupTask(context); + writer = outputFormat.getRecordWriter(context); + } + catch (InterruptedException e) { + throw new IOException(e); + } + storeFunc.prepareToWrite(writer); + return storeFunc; + } + + @Override + public void tearDown() throws IOException { + if (writer != null) { + try { + writer.close(context); + } + catch (InterruptedException e) { + throw new IOException(e); + } + writer = null; + } + if (outputCommitter.needsTaskCommit(context)) + outputCommitter.commitTask(context); + HadoopShims.commitOrCleanup(outputCommitter, context); + } + + @Override + public void cleanUp() throws IOException { + if (writer != null) { + try { + writer.close(context); + } + catch (InterruptedException e) { + throw new IOException(e); + } + writer = null; + } + HadoopShims.commitOrCleanup(outputCommitter, context); + } + +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchProgressableReporter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchProgressableReporter.java?rev=1563765&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchProgressableReporter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchProgressableReporter.java Mon Feb 3 03:57:25 2014 @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.fetch; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigProgressable; + +/** + * A dummy ProgressableReporter used for fetch tasks + * + */ +public class FetchProgressableReporter implements PigProgressable { + + private static final Log LOG = LogFactory.getLog(FetchProgressableReporter.class); + + public void progress() { + + } + + public void progress(String msg) { + LOG.info(msg); + } + +} Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1563765&r1=1563764&r2=1563765&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Mon Feb 3 03:57:25 2014 @@ -685,27 +685,13 @@ public class JobControlCompiler{ if(!pigContext.inIllustrator) mro.reducePlan.remove(st); } - - // set out filespecs - String outputPathString = st.getSFile().getFileName(); - if (!outputPathString.contains("://") || outputPathString.startsWith("hdfs://")) { - conf.set("pig.streaming.log.dir", - new Path(outputPathString, LOG_DIR).toString()); - } else { - String tmpLocationStr = FileLocalizer - .getTemporaryPath(pigContext).toString(); - tmpLocation = new Path(tmpLocationStr); - conf.set("pig.streaming.log.dir", - new Path(tmpLocation, LOG_DIR).toString()); - } - conf.set("pig.streaming.task.output.dir", outputPathString); + + MapRedUtil.setupStreamingDirsConfSingle(st, pigContext, conf); } else if (mapStores.size() + reduceStores.size() > 0) { // multi store case log.info("Setting up multi store job"); - String tmpLocationStr = FileLocalizer - .getTemporaryPath(pigContext).toString(); - tmpLocation = new Path(tmpLocationStr); - + MapRedUtil.setupStreamingDirsConfMulti(pigContext, conf); + nwJob.setOutputFormatClass(PigOutputFormat.class); boolean disableCounter = conf.getBoolean("pig.disable.counter", false); @@ -718,10 +704,6 @@ public class JobControlCompiler{ sto.setMultiStore(true); sto.setIndex(idx++); } - - conf.set("pig.streaming.log.dir", - new Path(tmpLocation, LOG_DIR).toString()); - conf.set("pig.streaming.task.output.dir", tmpLocation.toString()); } // store map key type Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1563765&r1=1563764&r2=1563765&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Mon Feb 3 03:57:25 2014 @@ -125,11 +125,7 @@ public class POUserFunc extends Expressi private void instantiateFunc(FuncSpec fSpec) { this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(fSpec); this.setSignature(signature); - Properties props = UDFContext.getUDFContext().getUDFProperties(func.getClass()); - Schema tmpS=(Schema)props.get("pig.evalfunc.inputschema."+signature); - - if(tmpS!=null) - this.func.setInputSchema(tmpS); + this.setFuncInputSchema(signature); if (func.getClass().isAnnotationPresent(MonitoredUDF.class)) { executor = new MonitoredUDFExecutor(func); } @@ -604,4 +600,17 @@ public class POUserFunc extends Expressi this.func.setUDFContextSignature(signature); } } + + /** + * Sets EvalFunc's inputschema based on the signature + * @param signature + */ + public void setFuncInputSchema(String signature) { + Properties props = UDFContext.getUDFContext().getUDFProperties(func.getClass()); + Schema tmpS=(Schema)props.get("pig.evalfunc.inputschema."+signature); + if(tmpS!=null) { + this.func.setInputSchema(tmpS); + } + } + } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java?rev=1563765&r1=1563764&r2=1563765&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java Mon Feb 3 03:57:25 2014 @@ -20,7 +20,6 @@ package org.apache.pig.backend.hadoop.ex import java.io.IOException; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Properties; import java.util.concurrent.ArrayBlockingQueue; @@ -32,9 +31,7 @@ import org.apache.pig.impl.plan.Operator import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.streaming.ExecutableManager; import org.apache.pig.impl.streaming.StreamingCommand; -import org.apache.pig.impl.util.IdentityHashSet; import org.apache.pig.pen.util.ExampleTuple; -import org.apache.pig.pen.util.LineageTracer; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; @@ -62,6 +59,14 @@ public class POStream extends PhysicalOp protected boolean allOutputFromBinaryProcessed = false; + /** + * This flag indicates whether streaming is done through fetching. If set, + * {@link FetchLauncher} pulls out the data from the pipeline. Therefore we need to + * skip the case in {@link #getNextTuple()} which is called by map() or reduce() when + * processing the next tuple. + */ + private boolean isFetchable; + public POStream(OperatorKey k, ExecutableManager executableManager, StreamingCommand command, Properties properties) { super(k); @@ -170,7 +175,7 @@ public class POStream extends PhysicalOp // if we are here, we haven't consumed all input to be sent // to the streaming binary - check if we are being called // from close() on the map or reduce - if(this.parentPlan.endOfAllInput) { + if(isFetchable || this.parentPlan.endOfAllInput) { Result r = getNextHelper((Tuple)null); if(r.returnStatus == POStatus.STATUS_EOP) { // we have now seen *ALL* possible input @@ -373,4 +378,19 @@ public class POStream extends PhysicalOp } return (Tuple) out; } + + /** + * @return true if streaming is done through fetching + */ + public boolean isFetchable() { + return isFetchable; + } + + /** + * @param isFetchable - whether fetching is applied on POStream + */ + public void setFetchable(boolean isFetchable) { + this.isFetchable = isFetchable; + } + } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=1563765&r1=1563764&r2=1563765&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Mon Feb 3 03:57:25 2014 @@ -41,6 +41,7 @@ import org.apache.pig.FuncSpec; import org.apache.pig.PigConfiguration; import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; @@ -174,6 +175,48 @@ public class MapRedUtil { } } + /** + * Sets up output and log dir paths for a single-store streaming job + * + * @param st - POStore of the current job + * @param pigContext + * @param conf + * @throws IOException + */ + public static void setupStreamingDirsConfSingle(POStore st, PigContext pigContext, + Configuration conf) throws IOException { + // set out filespecs + String outputPathString = st.getSFile().getFileName(); + if (!outputPathString.contains("://") || outputPathString.startsWith("hdfs://")) { + conf.set("pig.streaming.log.dir", + new Path(outputPathString, JobControlCompiler.LOG_DIR).toString()); + } + else { + String tmpLocationStr = FileLocalizer.getTemporaryPath(pigContext).toString(); + Path tmpLocation = new Path(tmpLocationStr); + conf.set("pig.streaming.log.dir", + new Path(tmpLocation, JobControlCompiler.LOG_DIR).toString()); + } + conf.set("pig.streaming.task.output.dir", outputPathString); + } + + /** + * Sets up output and log dir paths for a multi-store streaming job + * + * @param pigContext + * @param conf + * @throws IOException + */ + public static void setupStreamingDirsConfMulti(PigContext pigContext, Configuration conf) + throws IOException { + + String tmpLocationStr = FileLocalizer.getTemporaryPath(pigContext).toString(); + Path tmpLocation = new Path(tmpLocationStr); + conf.set("pig.streaming.log.dir", + new Path(tmpLocation, JobControlCompiler.LOG_DIR).toString()); + conf.set("pig.streaming.task.output.dir", tmpLocation.toString()); + } + public static FileSpec checkLeafIsStore( PhysicalPlan plan, PigContext pigContext) throws ExecException { Modified: pig/trunk/src/org/apache/pig/impl/util/PropertiesUtil.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/PropertiesUtil.java?rev=1563765&r1=1563764&r2=1563765&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/impl/util/PropertiesUtil.java (original) +++ pig/trunk/src/org/apache/pig/impl/util/PropertiesUtil.java Mon Feb 3 03:57:25 2014 @@ -26,6 +26,7 @@ import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.pig.PigConfiguration; public class PropertiesUtil { private static final String DEFAULT_PROPERTIES_FILE = "/pig-default.properties"; @@ -143,6 +144,11 @@ public class PropertiesUtil { //by default we keep going on error on the backend properties.setProperty("stop.on.failure", ""+false); } + + if (properties.getProperty(PigConfiguration.OPT_FETCH) == null) { + //by default fetch optimization is on + properties.setProperty(PigConfiguration.OPT_FETCH, ""+true); + } } /** Modified: pig/trunk/src/org/apache/pig/impl/util/Utils.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=1563765&r1=1563764&r2=1563765&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/impl/util/Utils.java (original) +++ pig/trunk/src/org/apache/pig/impl/util/Utils.java Mon Feb 3 03:57:25 2014 @@ -25,9 +25,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.PrintStream; import java.io.SequenceInputStream; -import java.net.Socket; -import java.net.SocketException; -import java.net.SocketImplFactory; import java.util.Arrays; import java.util.Collection; import java.util.Enumeration; @@ -334,7 +331,7 @@ public class Utils { } public static FileInputLoadFunc getTmpFileStorageObject(Configuration conf) throws IOException { - Class storageClass = getTmpFileStorage(ConfigurationUtil.toProperties(conf)).getStorageClass(); + Class storageClass = getTmpFileStorageClass(ConfigurationUtil.toProperties(conf)); try { return storageClass.newInstance(); } catch (InstantiationException e) { @@ -344,6 +341,10 @@ public class Utils { } } + public static Class getTmpFileStorageClass(Properties properties) { + return getTmpFileStorage(properties).getStorageClass(); + } + private static TEMPFILE_STORAGE getTmpFileStorage(Properties properties) { boolean tmpFileCompression = properties.getProperty( PigConfiguration.PIG_ENABLE_TEMP_FILE_COMPRESSION, "false").equals("true"); Modified: pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java?rev=1563765&r1=1563764&r2=1563765&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java (original) +++ pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java Mon Feb 3 03:57:25 2014 @@ -506,6 +506,10 @@ public class ExpToPhyTranslationVisitor .getNextNodeId(DEFAULT_SCOPE)), -1, null, op.getFuncSpec(), (EvalFunc) f); ((POUserFunc)p).setSignature(op.getSignature()); + //reinitialize input schema from signature + if (((POUserFunc)p).getFunc().getInputSchema() == null) { + ((POUserFunc)p).setFuncInputSchema(op.getSignature()); + } List cacheFiles = ((EvalFunc)f).getCacheFiles(); if (cacheFiles != null) { ((POUserFunc)p).setCacheFiles(cacheFiles.toArray(new String[cacheFiles.size()])); Added: pig/trunk/src/org/apache/pig/tools/pigstats/SimpleFetchPigStats.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/SimpleFetchPigStats.java?rev=1563765&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/tools/pigstats/SimpleFetchPigStats.java (added) +++ pig/trunk/src/org/apache/pig/tools/pigstats/SimpleFetchPigStats.java Mon Feb 3 03:57:25 2014 @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.tools.pigstats; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobClient; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.impl.PigContext; + +/** + * SimpleFetchPigStats encapsulates dummy statistics of a fetch task, since during a fetch + * no MR jobs are executed + * + */ +public class SimpleFetchPigStats extends PigStats { + + private final List outputStatsList; + private final List inputStatsList; + private final JobGraph emptyJobPlan = new JobGraph(); + + public SimpleFetchPigStats(PigContext pigContext, POStore poStore) { + + super.pigContext = pigContext; + super.startTime = super.endTime = System.currentTimeMillis(); + super.userId = System.getProperty("user.name"); + + Configuration conf = ConfigurationUtil.toConfiguration(pigContext.getProperties()); + + //initalize empty stats + OutputStats os = new OutputStats(null, -1, -1, true); + os.setConf(conf); + os.setPOStore(poStore); + this.outputStatsList = Collections.unmodifiableList(Arrays.asList(os)); + + InputStats is = new InputStats(null, -1, -1, true); + is.setConf(conf); + this.inputStatsList = Collections.unmodifiableList(Arrays.asList(is)); + + } + + @Override + public JobClient getJobClient() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isEmbedded() { + return false; + } + + @Override + public Map> getAllStats() { + throw new UnsupportedOperationException(); + } + + @Override + public List getAllErrorMessages() { + throw new UnsupportedOperationException(); + } + + @Override + public JobGraph getJobGraph() { + return emptyJobPlan; + } + + @Override + public List getOutputLocations() { + return Collections.emptyList(); + } + + @Override + public List getOutputNames() { + return Collections.emptyList(); + } + + @Override + public long getNumberBytes(String location) { + return -1L; + } + + @Override + public long getNumberRecords(String location) { + return -1L; + } + + @Override + public String getOutputAlias(String location) { + return null; + } + + @Override + public long getSMMSpillCount() { + return 0L; + } + + @Override + public long getProactiveSpillCountRecords() { + return 0L; + } + + @Override + public long getProactiveSpillCountObjects() { + return 0L; + } + + @Override + public long getBytesWritten() { + return 0L; + } + + @Override + public long getRecordWritten() { + return 0L; + } + + @Override + public int getNumberJobs() { + return 0; + } + + @Override + public List getOutputStats() { + return outputStatsList; + } + + @Override + public OutputStats result(String alias) { + return outputStatsList.get(0); + } + + @Override + public List getInputStats() { + return inputStatsList; + } + + @Override + public void setBackendException(String jobId, Exception e) { + throw new UnsupportedOperationException(); + } + + @Override + public void start() { + throw new UnsupportedOperationException(); + } + + @Override + public void stop() { + throw new UnsupportedOperationException(); + } + + @Override + public int getNumberSuccessfulJobs() { + return -1; + } + + @Override + public int getNumberFailedJobs() { + return -1; + } + +} Modified: pig/trunk/test/e2e/pig/tests/cmdline.conf URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/cmdline.conf?rev=1563765&r1=1563764&r2=1563765&view=diff ============================================================================== --- pig/trunk/test/e2e/pig/tests/cmdline.conf (original) +++ pig/trunk/test/e2e/pig/tests/cmdline.conf Mon Feb 3 03:57:25 2014 @@ -68,6 +68,7 @@ describe A;\, # #JIRA[PIG-373] # { # 'num' => 4, +# 'java_params' => ['-Dopt.fetch=false'], # 'pig' => q\ #A = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray); #describe A; @@ -240,6 +241,7 @@ describe D;\, { 'num' => 1, + 'java_params' => ['-Dopt.fetch=false'], 'pig' => q\ A = load ':INPATH:/singlefile/unicode100' as (name:chararray); dump A;\, Modified: pig/trunk/test/e2e/pig/tests/negative.conf URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/negative.conf?rev=1563765&r1=1563764&r2=1563765&view=diff ============================================================================== --- pig/trunk/test/e2e/pig/tests/negative.conf (original) +++ pig/trunk/test/e2e/pig/tests/negative.conf Mon Feb 3 03:57:25 2014 @@ -34,6 +34,7 @@ $cfg = { 'tests' => [ { 'num' => 1, + 'java_params' => ['-Dopt.fetch=false'], 'pig' => q\ a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); b = group a by name; @@ -48,6 +49,7 @@ dump c;\, 'tests' => [ { 'num' => 1, + 'java_params' => ['-Dopt.fetch=false'], 'pig' => q\a = load '/user/gates/nosuchfile'; dump a;\, 'expected_err_regex' => "ERROR 2118: Input path does not exist", }, @@ -248,6 +250,7 @@ store a into ':INPATH:/singlefile/fileex { # missing quotes around command 'num' => 1, + 'java_params' => ['-Dopt.fetch=false'], 'pig' => q# A = load ':INPATH:/singlefile/studenttab10k'; B = foreach A generate $2, $1, $0; @@ -259,6 +262,7 @@ dump C;#, { # input spec missing parenthesis 'num' => 2, + 'java_params' => ['-Dopt.fetch=false'], 'pig' => q# define CMD `perl PigStreaming.pl foo -` input 'foo' using PigStorage() ship(':SCRIPTHOMEPATH:/PigStreaming.pl'); A = load ':INPATH:/singlefile/studenttab10k'; @@ -269,6 +273,7 @@ dump B;#, { # no serializer name after using 'num' => 3, + 'java_params' => ['-Dopt.fetch=false'], 'pig' => q# define CMD `perl PigStreaming.pl foo -` output ('foo' using ); A = load ':INPATH:/singlefile/studenttab10k'; @@ -279,6 +284,7 @@ dump B;#, { # alias name missing from define 'num' => 4, + 'java_params' => ['-Dopt.fetch=false'], 'pig' => q# define `perl PigStreaming.pl foo -`; A = load ':INPATH:/singlefile/studenttab10k'; @@ -289,6 +295,7 @@ dump B;#, { # quotes missing from name of the file in ship script 'num' => 5, + 'java_params' => ['-Dopt.fetch=false'], 'pig' => q# define CMD `perl PigStreaming.pl foo -` ship(:SCRIPTHOMEPATH:/PigStreaming.pl); A = load ':INPATH:/singlefile/studenttab10k'; @@ -306,6 +313,7 @@ dump B;#, # Define uses using non-existent command (autoship) 'num' => 1, 'execonly' => 'mapred', + 'java_params' => ['-Dopt.fetch=false'], 'pig' => q\ define CMD `perl PigStreamingNotThere.pl`; A = load ':INPATH:/singlefile/studenttab10k'; @@ -316,6 +324,7 @@ dump B;\, { # Define uses non-existent command with ship clause 'num' => 2, + 'java_params' => ['-Dopt.fetch=false'], 'pig' => q\ define CMD `perl PigStreamingNotThere.pl foo -` ship(':SCRIPTHOMEPATH:/PigStreamingNotThere.pl'); A = load ':INPATH:/singlefile/studenttab10k'; @@ -339,6 +348,7 @@ dump E;\, { # Define uses non-existent serializer 'num' => 4, + 'java_params' => ['-Dopt.fetch=false'], 'pig' => q\ define CMD `perl PigStreaming.pl foo -` input('foo' using SerializerNotThere()) ship(':SCRIPTHOMEPATH:/PigStreaming.pl'); A = load ':INPATH:/singlefile/studenttab10k'; @@ -349,6 +359,7 @@ dump B;\, { # Define uses non-existent deserializer 'num' => 5, + 'java_params' => ['-Dopt.fetch=false'], 'pig' => q\ define CMD `perl PigStreaming.pl` output(stdout using DeserializerNotThere()) ship(':SCRIPTHOMEPATH:/PigStreaming.pl'); A = load ':INPATH:/singlefile/studenttab10k'; @@ -359,6 +370,7 @@ dump B;\, { # Invalid skip path 'num' => 6, + 'java_params' => ['-Dopt.fetch=false'], 'pig' => q\ set stream.skippath 'foo'; define CMD `perl PigStreaming.pl`; @@ -370,6 +382,7 @@ dump B;\, { # Invalid command alias in stream operator 'num' => 7, + 'java_params' => ['-Dopt.fetch=false'], 'pig' => q\ define CMD `perl PigStreaming.pl`; A = load ':INPATH:/singlefile/studenttab10k'; @@ -380,6 +393,7 @@ dump B;\, { # Invalid operator alias in stream operator 'num' => 8, + 'java_params' => ['-Dopt.fetch=false'], 'pig' => q\ define CMD `perl PigStreaming.pl`; A = load ':INPATH:/singlefile/studenttab10k'; @@ -483,6 +497,7 @@ store D into ':OUTPATH:';\, # Define uses using non-existent command 'num' => 1, 'execonly' => 'local', + 'java_params' => ['-Dopt.fetch=false'], 'pig' => q\ define CMD `perl PigStreamingNotThere.pl`; A = load ':INPATH:/singlefile/studenttab10k'; Modified: pig/trunk/test/e2e/pig/tests/nightly.conf URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1563765&r1=1563764&r2=1563765&view=diff ============================================================================== --- pig/trunk/test/e2e/pig/tests/nightly.conf (original) +++ pig/trunk/test/e2e/pig/tests/nightly.conf Mon Feb 3 03:57:25 2014 @@ -4499,9 +4499,9 @@ store C into ':OUTPATH:';\, }, { # Test Union using merge with incompatible types. float->bytearray and chararray->bytearray - 'num' => 8, - 'delimiter' => ' ', - 'pig' => q\ + 'num' => 8, + 'delimiter' => ' ', + 'pig' => q\ A = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age:int); B = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age:chararray); C = union onschema A, B; @@ -4511,17 +4511,18 @@ A = load ':INPATH:/singlefile/studenttab B = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age:bytearray); C = union A, B; store C into ':OUTPATH:';\, - } - ] + } + ] }, - { + { - # Test Union using merge with Simple data types - 'name' => 'UdfDistributedCache', + # Test Union using merge with Simple data types + 'name' => 'UdfDistributedCache', 'tests' => [ - { - 'num' => 1, + { + 'num' => 1, + 'java_params' => ['-Dopt.fetch=false'], 'execonly' => 'mapred', # since distributed cache is not supported in local mode 'pig' => q? register :FUNCPATH:/testudf.jar; @@ -4531,8 +4532,8 @@ store C into ':OUTPATH:';\, c = foreach b generate udfdc(age); dump c;?, 'expected_out_regex' => ":UdfDistributedCache_1_out:", - }, - ] + }, + ] }, { 'name' => 'MonitoredUDF', 'tests' => [ Modified: pig/trunk/test/org/apache/pig/test/TestAssert.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestAssert.java?rev=1563765&r1=1563764&r2=1563765&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestAssert.java (original) +++ pig/trunk/test/org/apache/pig/test/TestAssert.java Mon Feb 3 03:57:25 2014 @@ -22,10 +22,12 @@ import static org.apache.pig.builtin.moc import static org.apache.pig.builtin.mock.Storage.tuple; import java.util.List; +import java.util.Properties; import junit.framework.Assert; import org.apache.pig.ExecType; +import org.apache.pig.PigConfiguration; import org.apache.pig.PigServer; import org.apache.pig.builtin.mock.Storage.Data; import org.apache.pig.data.Tuple; @@ -84,9 +86,36 @@ public class TestAssert { try { pigServer.openIterator("A"); } catch (FrontendException fe) { + Assert.assertTrue(fe.getCause().getCause().getMessage().contains("Assertion violated")); + } + } + + /** + * Verify that ASSERT operator works. Disable fetch for this testcase. + * @throws Exception + */ + @Test + public void testNegativeWithoutFetch() throws Exception { + PigServer pigServer = new PigServer(ExecType.LOCAL); + Data data = resetData(pigServer); + + data.set("foo", + tuple(1), + tuple(2), + tuple(3) + ); + + pigServer.registerQuery("A = LOAD 'foo' USING mock.Storage() AS (i:int);"); + pigServer.registerQuery("ASSERT A BY i > 1 , 'i should be greater than 1';"); + + Properties props = pigServer.getPigContext().getProperties(); + props.setProperty(PigConfiguration.OPT_FETCH, "false"); + try { + pigServer.openIterator("A"); + } catch (FrontendException fe) { Assert.assertTrue(fe.getCause().getMessage().contains( "Job terminated with anomalous status FAILED")); } - } + } Modified: pig/trunk/test/org/apache/pig/test/TestAutoLocalMode.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestAutoLocalMode.java?rev=1563765&r1=1563764&r2=1563765&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestAutoLocalMode.java (original) +++ pig/trunk/test/org/apache/pig/test/TestAutoLocalMode.java Mon Feb 3 03:57:25 2014 @@ -78,6 +78,7 @@ public class TestAutoLocalMode { @Before public void setUp() throws Exception{ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + pigServer.getPigContext().getExecutionEngine().setProperty(PigConfiguration.OPT_FETCH, "false"); pigServer.getPigContext().getExecutionEngine().setProperty(PigConfiguration.PIG_AUTO_LOCAL_ENABLED, String.valueOf("true")); pigServer.getPigContext().getExecutionEngine().setProperty(PigConfiguration.PIG_AUTO_LOCAL_INPUT_MAXBYTES, "200"); Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1563765&r1=1563764&r2=1563765&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original) +++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Mon Feb 3 03:57:25 2014 @@ -29,10 +29,12 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; +import java.util.Properties; import java.util.Random; import org.apache.pig.EvalFunc; import org.apache.pig.ExecType; +import org.apache.pig.PigConfiguration; import org.apache.pig.PigException; import org.apache.pig.PigServer; import org.apache.pig.builtin.BinStorage; @@ -1011,17 +1013,22 @@ public class TestEvalPipeline2 { Iterator iter = pigServer.openIterator("e"); + Map expected = new HashMap(3); + expected.put(1, null); + expected.put(2, null); + expected.put(4, null); + Tuple t = iter.next(); Assert.assertTrue(t.size()==1); - Assert.assertTrue((Integer)t.get(0)==1); + Assert.assertTrue(expected.containsKey(t.get(0))); t = iter.next(); Assert.assertTrue(t.size()==1); - Assert.assertTrue((Integer)t.get(0)==4); + Assert.assertTrue(expected.containsKey(t.get(0))); t = iter.next(); Assert.assertTrue(t.size()==1); - Assert.assertTrue((Integer)t.get(0)==2); + Assert.assertTrue(expected.containsKey(t.get(0))); Assert.assertFalse(iter.hasNext()); } @@ -1595,10 +1602,36 @@ public class TestEvalPipeline2 { pigServer.openIterator("b"); Assert.fail(); } catch (Exception e) { - Assert.assertTrue(e.getMessage().contains(ArrayList.class.getName())); + String message = e.getCause().getCause().getMessage(); + Assert.assertTrue(message.contains(ArrayList.class.getName())); } } + // See PIG-1826 + @Test + public void testNonStandardDataWithoutFetch() throws Exception{ + Properties props = pigServer.getPigContext().getProperties(); + props.setProperty(PigConfiguration.OPT_FETCH, "false"); + String[] input1 = { + "0", + }; + try { + Util.createInputFile(cluster, "table_testNonStandardDataWithoutFetch", input1); + pigServer.registerQuery("a = load 'table_testNonStandardDataWithoutFetch' as (a0);"); + pigServer.registerQuery("b = foreach a generate " + UDFWithNonStandardType.class.getName() + "(a0);"); + + try { + pigServer.openIterator("b"); + Assert.fail(); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains(ArrayList.class.getName())); + } + } + finally { + props.setProperty(PigConfiguration.OPT_FETCH, "true"); + } + } + // See PIG-2078 @Test public void testProjectNullBag() throws Exception{ Added: pig/trunk/test/org/apache/pig/test/TestFetch.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFetch.java?rev=1563765&view=auto ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestFetch.java (added) +++ pig/trunk/test/org/apache/pig/test/TestFetch.java Mon Feb 3 03:57:25 2014 @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.PrintStream; +import java.util.Iterator; +import java.util.Properties; +import java.util.Random; + +import org.apache.pig.ExecType; +import org.apache.pig.PigConfiguration; +import org.apache.pig.PigServer; +import org.apache.pig.backend.hadoop.executionengine.fetch.FetchLauncher; +import org.apache.pig.backend.hadoop.executionengine.fetch.FetchOptimizer; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRExecutionEngine; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.newplan.logical.relational.LogicalPlan; +import org.apache.pig.parser.ParserTestingUtils; +import org.apache.pig.test.utils.GenPhyOp; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestFetch { + + private PigServer pigServer; + + private static File inputFile1; + private static File inputFile2; + + private static final long SEED = 1013; + private static final Random r = new Random(SEED); + + @BeforeClass + public static void setUpOnce() throws Exception { + + String[] data1 = { + "1 {(1,2,7,8,b),(1,3,3,5,a)}", + "2 {(2,4,6,6,k)}", + "3 {(3,7,8,9,p),(3,6,3,1,n)}", + "5 {(5,1,1,2,c)}" + }; + + String[] data2 = { + "1 3 a", + "1 2 b", + "2 4 k", + "3 6 n", + "3 7 p", + "5 1 c" + }; + + inputFile1 = Util.createInputFile("tmp", "testFetchData1.txt", data1); + inputFile2 = Util.createInputFile("tmp", "testFetchData2.txt", data2); + + } + + @Before + public void setUp() throws Exception{ + pigServer = new PigServer(ExecType.LOCAL, new Properties()); + } + + @Test + public void test1() throws Exception { + String query = + "A = load '"+Util.encodeEscape(inputFile1.getAbsolutePath()) +"' " + + "using PigStorage(' ') as (a:int, b: " + + "{t:(t1:int,t2:int,t3:int,t4:int,c:chararray)});" + + "C = foreach A {" + + " temp1 = foreach b generate t1*100 as (key:int), ((t2+t3)*10) as (r:int);" + + " temp2 = filter temp1 by key < 400;" + + " temp3 = limit temp2 3;" + + " temp4 = foreach temp3 generate key-r as (id:int);" + + " temp5 = limit temp4 4;" + + " temp6 = filter temp5 by id < 100;" + + " generate flatten(temp6) as (id:int), a;" + + "};" + + "D = foreach C generate (" + + " case id % 4" + + " when 0 then true" + + " else false" + + " end" + + ") as (check:boolean);"; + + LogicalPlan lp = ParserTestingUtils.generateLogicalPlan(query); + + PhysicalPlan pp = ((MRExecutionEngine) pigServer.getPigContext().getExecutionEngine()) + .compile(lp, null); + + boolean planFetchable = FetchOptimizer.isPlanFetchable(pigServer.getPigContext(), pp); + assertTrue(planFetchable); + + } + + @Test + public void test2() throws Exception { + Properties properties = pigServer.getPigContext().getProperties(); + properties.setProperty(PigConfiguration.PIG_TEMP_FILE_COMPRESSION_CODEC, "gz"); + properties.setProperty(PigConfiguration.PIG_ENABLE_TEMP_FILE_COMPRESSION, "true"); + properties.setProperty(PigConfiguration.PIG_TEMP_FILE_COMPRESSION_STORAGE, "tfile"); + + String query = + "A = load '"+Util.encodeEscape(inputFile1.getAbsolutePath()) +"' " + + "using PigStorage(' ') as (a:int, b: " + + "{t:(t1:int,t2:int,t3:int,t4:int,c:chararray)});" + + "C = foreach A {" + + " temp1 = foreach b generate t1*100 as (key:int), ((t2+t3)*10) as (r:int);" + + " temp2 = filter temp1 by key < 400;" + + " temp3 = limit temp2 3;" + + " temp4 = foreach temp3 generate key-r as (id:int);" + + " temp5 = limit temp4 4;" + + " temp6 = filter temp5 by id < 100;" + + " generate flatten(temp6) as (id:int), a;" + + "};" + + "D = foreach C generate (" + + " case id % 4" + + " when 0 then true" + + " else false" + + " end" + + ") as (check:boolean);" + + "store D into 'out' using org.apache.pig.impl.io.TFileStorage();"; + + LogicalPlan lp = ParserTestingUtils.generateLogicalPlan(query); + + PhysicalPlan pp = ((MRExecutionEngine) pigServer.getPigContext().getExecutionEngine()) + .compile(lp, null); + + boolean planFetchable = FetchOptimizer.isPlanFetchable(pigServer.getPigContext(), pp); + assertFalse(planFetchable); + + } + + @Test + public void test3() throws Exception { + File scriptFile = null; + try { + String[] script = { + "A = load '"+Util.encodeEscape(inputFile1.getAbsolutePath()) +"' ", + "using PigStorage(' ') as (a:int, b: ", + "{t:(t1:int,t2:int,t3:int,t4:int,c:chararray)});", + "C = foreach A {", + " temp1 = foreach b generate t1*100 as (key:int), ((t2+t3)*10) as (r:int);", + " temp2 = filter temp1 by key < 400;", + " temp3 = limit temp2 3;", + " temp4 = foreach temp3 generate key-r as (id:int);", + " temp5 = limit temp4 4;", + " temp6 = filter temp5 by id < 100;", + " generate flatten(temp6) as (id:int), a;", + "};", + "D = foreach C generate (", + " case id % 4", + " when 0 then true", + " else false", + " end", + ") as (check:boolean);" + }; + + scriptFile = Util.createLocalInputFile( "testFetchTest3.pig", script); + pigServer.registerScript(scriptFile.getAbsolutePath()); + + Iterator it = pigServer.openIterator("D"); + while (it.hasNext()) { + assertEquals(false, it.next().get(0)); + assertEquals(true, it.next().get(0)); + } + } + finally { + if (scriptFile != null) { + scriptFile.delete(); + } + } + } + + @Test + public void test4() throws Exception { + File scriptFile = null; + try { + String[] script = { + "A = load '"+Util.encodeEscape(inputFile2.getAbsolutePath()) +"' ", + "using PigStorage(' ') as (a:int, b:int, c:chararray);", + "B = limit A 2;", + "C = limit A 1;", + "D = union A,B,C;" //introduces an implicit split operator + }; + + scriptFile = Util.createLocalInputFile( "testFetchTest4.pig", script); + pigServer.registerScript(scriptFile.getAbsolutePath()); + pigServer.setBatchOn(); + + LogicalPlan lp = TestPigStats.getLogicalPlan(pigServer); + PhysicalPlan pp = ((MRExecutionEngine) + pigServer.getPigContext().getExecutionEngine()).compile(lp, null); + boolean planFetchable = FetchOptimizer.isPlanFetchable(pigServer.getPigContext(), pp); + assertFalse(planFetchable); + + } + finally { + if (scriptFile != null) { + scriptFile.delete(); + } + } + } + + @Test + public void test5() throws Exception { + + File scriptFile = null; + try { + String[] script = { + "A = load '"+Util.encodeEscape(inputFile2.getAbsolutePath()) +"' ", + "using PigStorage(' ') as (a:int, b:int, c:chararray);", + "B = group A by a;" + }; + + scriptFile = Util.createLocalInputFile( "testFetchTest5.pig", script); + pigServer.registerScript(scriptFile.getAbsolutePath()); + pigServer.setBatchOn(); + + LogicalPlan lp = TestPigStats.getLogicalPlan(pigServer); + PhysicalPlan pp = ((MRExecutionEngine) + pigServer.getPigContext().getExecutionEngine()).compile(lp, null); + boolean planFetchable = FetchOptimizer.isPlanFetchable(pigServer.getPigContext(), pp); + assertFalse(planFetchable); + + } + finally { + if (scriptFile != null) { + scriptFile.delete(); + } + } + } + + @Test + public void test6() throws Exception { + PigContext pc = pigServer.getPigContext(); + + PhysicalPlan pp = new PhysicalPlan(); + POLoad poLoad = GenPhyOp.topLoadOp(); + pp.add(poLoad); + POLimit poLimit = new POLimit(new OperatorKey("", r.nextLong()), -1, null); + pp.add(poLimit); + pp.connect(poLoad, poLimit); + POStore poStore = GenPhyOp.topStoreOp(); + pp.addAsLeaf(poStore); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream ps = new PrintStream(baos); + new FetchLauncher(pc).explain(pp, pc, ps, "xml"); + assertTrue(baos.toString().matches("(?si).*No MR jobs. Fetch only.*")); + + } + + @AfterClass + public static void tearDownOnce() throws Exception { + inputFile1.delete(); + inputFile2.delete(); + } + +}