pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject svn commit: r1797099 - in /pig/trunk: src/docs/src/documentation/content/xdocs/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ s...
Date Wed, 31 May 2017 19:00:08 GMT
Author: daijy
Date: Wed May 31 19:00:07 2017
New Revision: 1797099

URL: http://svn.apache.org/viewvc?rev=1797099&view=rev
Log:
PIG-5216: Customizable Error Handling for Loaders in Pig (chenjunz via daijy)

Added:
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LoadFuncDecorator.java
    pig/trunk/test/org/apache/pig/test/TestErrorHandlingLoadAndStoreFunc.java
Removed:
    pig/trunk/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java
Modified:
    pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml
    pig/trunk/src/org/apache/pig/CounterBasedErrorHandler.java
    pig/trunk/src/org/apache/pig/ErrorHandler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigRecordReader.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
    pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
    pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java

Modified: pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml
URL: http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml?rev=1797099&r1=1797098&r2=1797099&view=diff
==============================================================================
--- pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml (original)
+++ pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml Wed May 31 19:00:07 2017
@@ -1204,7 +1204,7 @@ This interface has methods to interact w
 <li id="storeresources"><a href="http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/StoreResources.java?view=markup">StoreResources:</a> 
 This interface has methods to put hdfs files or local files to distributed cache. </li>
 <li id="errorhandling"><a href="http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/ErrorHandling.java?view=markup">ErrorHandling:</a> 
-This interface allow you to skip bad records in the storer so the storer will not throw exception and terminate the job. You can implement your own error handler by overriding <a href="http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/ErrorHandler.java?view=markup">ErrorHandler</a> interface, or use predefined error handler: <a href="http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/CounterBasedErrorHandler.java?view=markup">CounterBasedErrorHandler</a>. ErrorHandling can be turned on by setting the property pig.error-handling.enabled to true in pig.properties. Default is false.  CounterBasedErrorHandler uses two settings - pig.error-handling.min.error.records (the minimum number of errors to trigger error handling) and pig.error-handling.error.threshold (percentage of the number of records as a fraction exceeding which error is thrown).</li>
+This interface allow you to skip bad records in both loader and storer so they will not throw exception and terminate the job. You can implement your own error handler by overriding <a href="http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/ErrorHandler.java?view=markup">ErrorHandler</a> interface, or use predefined error handler: <a href="http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/CounterBasedErrorHandler.java?view=markup">CounterBasedErrorHandler</a>. ErrorHandling can be turned on by setting the property pig.error-handling.enabled to true in pig.properties. Default is false.  CounterBasedErrorHandler uses two settings - pig.error-handling.min.error.records (the minimum number of errors to trigger error handling) and pig.error-handling.error.threshold (percentage of the number of records as a fraction exceeding which error is thrown).</li>
 </ul>
 
 <p id="storefunc-override">The methods which need to be overridden in StoreFunc are explained below: </p>

Modified: pig/trunk/src/org/apache/pig/CounterBasedErrorHandler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/CounterBasedErrorHandler.java?rev=1797099&r1=1797098&r2=1797099&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/CounterBasedErrorHandler.java (original)
+++ pig/trunk/src/org/apache/pig/CounterBasedErrorHandler.java Wed May 31 19:00:07 2017
@@ -19,15 +19,14 @@ package org.apache.pig;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Counter;
-import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 public class CounterBasedErrorHandler implements ErrorHandler {
 
-    public static final String STORER_ERROR_HANDLER_COUNTER_GROUP = "storer_Error_Handler";
-    public static final String STORER_ERROR_COUNT = "bad_record_count";
-    public static final String STORER_RECORD_COUNT = "record__count";
+    public static final String ERROR_HANDLER_COUNTER_GROUP = "error_Handler";
+    public static final String ERROR_COUNT = "bad_record_count";
+    public static final String RECORD_COUNT = "record__count";
 
     private final long minErrors;
     private final float errorThreshold; // fraction of errors allowed
@@ -42,18 +41,18 @@ public class CounterBasedErrorHandler im
 
     @Override
     public void onSuccess(String uniqueSignature) {
-        incAndGetCounter(uniqueSignature, STORER_RECORD_COUNT);
+        incAndGetCounter(uniqueSignature, RECORD_COUNT);
     }
 
     @Override
-    public void onError(String uniqueSignature, Exception e, Tuple inputTuple) {
-        long numErrors = incAndGetCounter(uniqueSignature, STORER_ERROR_COUNT);
-        long numRecords = incAndGetCounter(uniqueSignature, STORER_RECORD_COUNT);
+    public void onError(String uniqueSignature, Exception e) {
+        long numErrors = incAndGetCounter(uniqueSignature, ERROR_COUNT);
+        long numRecords = incAndGetCounter(uniqueSignature, RECORD_COUNT);
         boolean exceedThreshold = hasErrorExceededThreshold(numErrors,
                 numRecords);
         if (exceedThreshold) {
             throw new RuntimeException(
-                    "Exceeded the error rate while writing records. The latest error seen  ",
+                    "Exceeded the error rate while processing records. The latest error seen  ",
                     e);
         }
     }
@@ -71,37 +70,37 @@ public class CounterBasedErrorHandler im
         return false;
     }
 
-    public long getRecordCount(String storeSignature) {
-        Counter counter = getCounter(storeSignature, STORER_RECORD_COUNT);
+    public long getRecordCount(String signature) {
+        Counter counter = getCounter(signature, RECORD_COUNT);
         return counter.getValue();
     }
 
-    private long incAndGetCounter(String storeSignature, String counterName) {
-        Counter counter = getCounter(storeSignature, counterName);
+    private long incAndGetCounter(String signature, String counterName) {
+        Counter counter = getCounter(signature, counterName);
         counter.increment(1);
         return counter.getValue();
     }
 
     /**
-     * Get Counter for a given counterName and Store Signature
+     * Get Counter for a given counterName and signature
      * 
      * @param counterName
-     * @param storeSignature
+     * @param signature
      * @return
      */
-    private Counter getCounter(String storeSignature, String counterName) {
+    private Counter getCounter(String signature, String counterName) {
         PigStatusReporter reporter = PigStatusReporter.getInstance();
         @SuppressWarnings("deprecation")
         Counter counter = reporter.getCounter(
-                STORER_ERROR_HANDLER_COUNTER_GROUP,
-                getCounterNameForStore(counterName, storeSignature));
+                ERROR_HANDLER_COUNTER_GROUP,
+                getCounterNameForStore(counterName, signature));
         return counter;
     }
 
     private String getCounterNameForStore(String counterNamePrefix,
-            String storeSignature) {
+            String signature) {
         StringBuilder counterName = new StringBuilder()
-                .append(counterNamePrefix).append("_").append(storeSignature);
+                .append(counterNamePrefix).append("_").append(signature);
         return counterName.toString();
     }
 }

Modified: pig/trunk/src/org/apache/pig/ErrorHandler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/ErrorHandler.java?rev=1797099&r1=1797098&r2=1797099&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/ErrorHandler.java (original)
+++ pig/trunk/src/org/apache/pig/ErrorHandler.java Wed May 31 19:00:07 2017
@@ -43,5 +43,5 @@ public interface ErrorHandler {
      * @param inputTuple
      *            the tuple to store.
      */
-    public void onError(String uniqueSignature, Exception e, Tuple inputTuple);
+    public void onError(String uniqueSignature, Exception e);
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1797099&r1=1797098&r2=1797099&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Wed May 31 19:00:07 2017
@@ -508,7 +508,7 @@ public class JobControlCompiler{
 
         Configuration conf = nwJob.getConfiguration();
 
-        ArrayList<FileSpec> inp = new ArrayList<FileSpec>();
+        ArrayList<POLoad> inp = new ArrayList<POLoad>();
         ArrayList<List<OperatorKey>> inpTargets = new ArrayList<List<OperatorKey>>();
         ArrayList<String> inpSignatureLists = new ArrayList<String>();
         ArrayList<Long> inpLimits = new ArrayList<Long>();
@@ -548,8 +548,9 @@ public class JobControlCompiler{
                     LoadFunc lf = ld.getLoadFunc();
                     lf.setLocation(ld.getLFile().getFileName(), nwJob);
 
+                    ld.setParentPlan(null);
                     //Store the inp filespecs
-                    inp.add(ld.getLFile());
+                    inp.add(ld);
                 }
             }
 
@@ -704,7 +705,7 @@ public class JobControlCompiler{
             if(Utils.isLocal(pigContext, conf)) {
                 ConfigurationUtil.replaceConfigForLocalMode(conf);
             }
-            conf.set(PigInputFormat.PIG_INPUTS, ObjectSerializer.serialize(inp));
+            conf.set(PigInputFormat.PIG_LOADS, ObjectSerializer.serialize(inp));
             conf.set(PigInputFormat.PIG_INPUT_TARGETS, ObjectSerializer.serialize(inpTargets));
             conf.set(PigInputFormat.PIG_INPUT_SIGNATURES, ObjectSerializer.serialize(inpSignatureLists));
             conf.set(PigInputFormat.PIG_INPUT_LIMITS, ObjectSerializer.serialize(inpLimits));

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1797099&r1=1797098&r2=1797099&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Wed May 31 19:00:07 2017
@@ -41,6 +41,8 @@ import org.apache.pig.OrderedLoadFunc;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LoadFuncDecorator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.Tuple;
@@ -56,7 +58,7 @@ public class PigInputFormat extends Inpu
     public static final Log log = LogFactory
             .getLog(PigInputFormat.class);
 
-    public static final String PIG_INPUTS = "pig.inputs";
+    public static final String PIG_LOADS = "pig.loads";
     public static final String PIG_INPUT_TARGETS = "pig.inpTargets";
     public static final String PIG_INPUT_SIGNATURES = "pig.inpSignatures";
     public static final String PIG_INPUT_LIMITS = "pig.inpLimits";
@@ -81,7 +83,7 @@ public class PigInputFormat extends Inpu
     protected static class RecordReaderFactory {
         protected InputFormat inputFormat;
         protected PigSplit pigSplit;
-        protected LoadFunc loadFunc;
+        protected LoadFuncDecorator decorator;
         protected TaskAttemptContext context;
         protected long limit;
 
@@ -106,7 +108,9 @@ public class PigInputFormat extends Inpu
             PigContext.setPackageImportList((ArrayList<String>) ObjectSerializer
                     .deserialize(conf.get("udf.import.list")));
             MapRedUtil.setupUDFContext(conf);
-            LoadFunc loadFunc = getLoadFunc(pigSplit.getInputIndex(), conf);
+            POLoad poLoad = getLoadFunc(pigSplit.getInputIndex(), conf);
+            LoadFunc loadFunc = poLoad.getLoadFunc();
+            LoadFuncDecorator decorator = poLoad.getLoadFuncDecorator();
             // Pass loader signature to LoadFunc and to InputFormat through
             // the conf
             passLoadSignature(loadFunc, pigSplit.getInputIndex(), conf);
@@ -122,13 +126,13 @@ public class PigInputFormat extends Inpu
 
             this.inputFormat = inputFormat;
             this.pigSplit = pigSplit;
-            this.loadFunc = loadFunc;
+            this.decorator = decorator;
             this.context = context;
             this.limit = inpLimitLists.get(pigSplit.getInputIndex());
         }
 
         public org.apache.hadoop.mapreduce.RecordReader<Text, Tuple> createRecordReader() throws IOException, InterruptedException {
-            return new PigRecordReader(inputFormat, pigSplit, loadFunc, context, limit);
+            return new PigRecordReader(inputFormat, pigSplit, decorator, context, limit);
         }
     }
 
@@ -159,20 +163,19 @@ public class PigInputFormat extends Inpu
      * @throws IOException
      */
     @SuppressWarnings("unchecked")
-    private static LoadFunc getLoadFunc(int inputIndex, Configuration conf) throws IOException {
-        ArrayList<FileSpec> inputs =
-                (ArrayList<FileSpec>) ObjectSerializer.deserialize(
-                        conf.get(PIG_INPUTS));
-        FuncSpec loadFuncSpec = inputs.get(inputIndex).getFuncSpec();
-        return (LoadFunc) PigContext.instantiateFuncFromSpec(loadFuncSpec);
+    private static POLoad getLoadFunc(int inputIndex, Configuration conf) throws IOException {
+        ArrayList<POLoad> inputs =
+            (ArrayList<POLoad>) ObjectSerializer.deserialize(
+                     conf.get(PIG_LOADS));
+        return inputs.get(inputIndex);
     }
 
     @SuppressWarnings("unchecked")
     private static String getLoadLocation(int inputIndex, Configuration conf) throws IOException {
-        ArrayList<FileSpec> inputs =
-                (ArrayList<FileSpec>) ObjectSerializer.deserialize(
-                        conf.get(PIG_INPUTS));
-        return inputs.get(inputIndex).getFileName();
+        ArrayList<POLoad> inputs =
+                (ArrayList<POLoad>) ObjectSerializer.deserialize(
+                        conf.get(PIG_LOADS));
+        return inputs.get(inputIndex).getLFile().getFileName();
     }
 
     /**
@@ -210,11 +213,11 @@ public class PigInputFormat extends Inpu
 
         Configuration conf = jobcontext.getConfiguration();
 
-        ArrayList<FileSpec> inputs;
+        ArrayList<POLoad> inputs;
         ArrayList<ArrayList<OperatorKey>> inpTargets;
         try {
-            inputs = (ArrayList<FileSpec>) ObjectSerializer
-                    .deserialize(conf.get(PIG_INPUTS));
+            inputs = (ArrayList<POLoad>) ObjectSerializer
+                    .deserialize(conf.get(PIG_LOADS));
             inpTargets = (ArrayList<ArrayList<OperatorKey>>) ObjectSerializer
                     .deserialize(conf.get(PIG_INPUT_TARGETS));
             PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(conf.get("udf.import.list")));
@@ -228,7 +231,7 @@ public class PigInputFormat extends Inpu
         ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
         for (int i = 0; i < inputs.size(); i++) {
             try {
-                Path path = new Path(inputs.get(i).getFileName());
+                Path path = new Path(inputs.get(i).getLFile().getFileName());
 
                 FileSystem fs;
                 boolean isFsPath = true;
@@ -257,7 +260,7 @@ public class PigInputFormat extends Inpu
                 // FileInputFormat stores this in mapred.input.dir in the conf),
                 // then for different inputs, the loader's don't end up
                 // over-writing the same conf.
-                FuncSpec loadFuncSpec = inputs.get(i).getFuncSpec();
+                FuncSpec loadFuncSpec = inputs.get(i).getLFile().getFuncSpec();
                 LoadFunc loadFunc = (LoadFunc) PigContext.instantiateFuncFromSpec(
                         loadFuncSpec);
                 boolean combinable = !(loadFunc instanceof MergeJoinIndexer
@@ -270,7 +273,7 @@ public class PigInputFormat extends Inpu
                 // Pass loader signature to LoadFunc and to InputFormat through
                 // the conf
                 passLoadSignature(loadFunc, i, inputSpecificJob.getConfiguration());
-                loadFunc.setLocation(inputs.get(i).getFileName(),
+                loadFunc.setLocation(inputs.get(i).getLFile().getFileName(),
                         inputSpecificJob);
                 // The above setLocation call could write to the conf within
                 // the inputSpecificJob - use this updated conf
@@ -289,7 +292,8 @@ public class PigInputFormat extends Inpu
                 throw ee;
             } catch (Exception e) {
                 int errCode = 2118;
-                String msg = "Unable to create input splits for: " + inputs.get(i).getFileName();
+                String msg = "Unable to create input splits for: " +
+                        inputs.get(i).getLFile().getFileName();
                 if(e.getMessage() !=null && (!e.getMessage().isEmpty()) ){
                     throw new ExecException(e.getMessage(), errCode, PigException.BUG, e);
                 }else{

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java?rev=1797099&r1=1797098&r2=1797099&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java Wed May 31 19:00:07 2017
@@ -34,8 +34,9 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LoadFuncDecorator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.tools.pigstats.PigStatsUtil;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
@@ -72,6 +73,9 @@ public class PigRecordReader extends Rec
     // the loader object
     private LoadFunc loadfunc;
 
+    // the LoadFuncDecorator
+    private LoadFuncDecorator decorator;
+
     // the Hadoop counter name
     transient private String counterName = null;
 
@@ -107,10 +111,11 @@ public class PigRecordReader extends Rec
      *
      */
     public PigRecordReader(InputFormat<?, ?> inputformat, PigSplit pigSplit,
-            LoadFunc loadFunc, TaskAttemptContext context, long limit) throws IOException, InterruptedException {
+            LoadFuncDecorator decorator, TaskAttemptContext context, long limit) throws IOException, InterruptedException {
         this.inputformat = inputformat;
         this.pigSplit = pigSplit;
-        this.loadfunc = loadFunc;
+        this.decorator = decorator;
+        this.loadfunc = decorator.getLoader();
         this.context = context;
         this.reporter = PigStatusReporter.getInstance();
         this.inputSpecificConf = context.getConfiguration();
@@ -121,7 +126,7 @@ public class PigRecordReader extends Rec
         initNextRecordReader();
         doTiming = inputSpecificConf.getBoolean(PIG_UDF_PROFILE, false);
         if (doTiming) {
-            counterGroup = loadFunc.toString();
+            counterGroup = loadfunc.toString();
             timingFrequency = inputSpecificConf.getLong(PIG_UDF_PROFILE_FREQUENCY, 100L);
         }
     }
@@ -201,7 +206,7 @@ public class PigRecordReader extends Rec
         if (timeThis) {
             startNanos = System.nanoTime();
         }
-        while ((curReader == null) || (curValue = loadfunc.getNext()) == null) {
+        while ((curReader == null) || (curValue = decorator.getNext()) == null) {
             if (!initNextRecordReader()) {
               return false;
             }
@@ -217,10 +222,10 @@ public class PigRecordReader extends Rec
     @SuppressWarnings("unchecked")
     private static String getMultiInputsCounerName(PigSplit pigSplit,
             Configuration conf) throws IOException {
-        ArrayList<FileSpec> inputs =
-            (ArrayList<FileSpec>) ObjectSerializer.deserialize(
-                    conf.get(PigInputFormat.PIG_INPUTS));
-        String fname = inputs.get(pigSplit.getInputIndex()).getFileName();
+        ArrayList<POLoad> inputs =
+                (ArrayList<POLoad>) ObjectSerializer.deserialize(
+                        conf.get(PigInputFormat.PIG_LOADS));
+        String fname = inputs.get(pigSplit.getInputIndex()).getLFile().getFileName();
         return PigStatsUtil.getMultiInputsCounterName(fname, pigSplit.getInputIndex());
     }
 

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LoadFuncDecorator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LoadFuncDecorator.java?rev=1797099&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LoadFuncDecorator.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LoadFuncDecorator.java Wed May 31 19:00:07 2017
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.io.IOException;
+
+import org.apache.pig.ErrorHandler;
+import org.apache.pig.ErrorHandling;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ * This class is used to decorate the {@code LoadFunc#getNext(Tuple)}. It
+ * handles errors by calling
+ * {@code OutputErrorHandler#handle(String, long, Throwable)} if the
+ * {@link LoadFunc} implements {@link ErrorHandling}
+ *
+ */
+
+public class LoadFuncDecorator {
+	private final LoadFunc loader;
+    private final String udfSignature;
+    private boolean shouldHandleErrors;
+    private ErrorHandler errorHandler;
+
+    public LoadFuncDecorator(LoadFunc loader, String udfSignature) {
+        this.loader = loader;
+        this.udfSignature = udfSignature;
+        init();
+    }
+
+    private void init() {
+        // The decorators work is mainly on backend only so not creating error
+        // handler on frontend
+        if (UDFContext.getUDFContext().isFrontend()) {
+            return;
+        }
+        if (loader instanceof ErrorHandling && allowErrors()) {
+            errorHandler = ((ErrorHandling) loader).getErrorHandler();
+            shouldHandleErrors = true;
+        }
+    }
+
+    private boolean allowErrors() {
+        return UDFContext.getUDFContext().getJobConf()
+                .getBoolean(PigConfiguration.PIG_ERROR_HANDLING_ENABLED, false);
+    }
+
+    /**
+     * Call {@code LoadFunc#getNext(Tuple)} and handle errors
+     *
+     * @throws IOException
+     */
+    public Tuple getNext() throws IOException {
+        Tuple t = null;
+        try {
+            t = loader.getNext();
+            if (shouldHandleErrors) {
+                errorHandler.onSuccess(udfSignature);
+            }
+        } catch (Exception e) {
+            if (shouldHandleErrors) {
+                errorHandler.onError(udfSignature, e);
+            } else {
+                throw new IOException(e);
+            }
+        }
+		return t;
+    }
+
+    public LoadFunc getLoader() {
+        return loader;
+    }
+
+    public boolean getErrorHandling() {
+        return shouldHandleErrors;
+    }
+}

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java?rev=1797099&r1=1797098&r2=1797099&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java Wed May 31 19:00:07 2017
@@ -54,6 +54,7 @@ public class POLoad extends PhysicalOper
     private static final long serialVersionUID = 1L;
     // The user defined load function or a default load function
     private transient LoadFunc loader = null;
+    private transient LoadFuncDecorator lDecorator;
     // The filespec on which the operator is based
     FileSpec lFile;
     // PigContext passed to us by the operator creator
@@ -100,6 +101,7 @@ public class POLoad extends PhysicalOper
                 PigContext.instantiateFuncFromSpec(lFile.getFuncSpec()), 
                 ConfigurationUtil.toConfiguration(pc.getProperties()), 
                 lFile.getFileName(),0, signature);
+        setLoadFuncDecorator(new LoadFuncDecorator(loader, signature));
     }
     
     /**
@@ -134,8 +136,8 @@ public class POLoad extends PhysicalOper
         }
         Result res = new Result();
         try {
-            res.result = loader.getNext();
-            if(res.result==null){
+            res.result = lDecorator.getNext();
+            if(res.result==null && !lDecorator.getErrorHandling()){
                 res.returnStatus = POStatus.STATUS_EOP;
                 tearDown();
             }
@@ -213,9 +215,20 @@ public class POLoad extends PhysicalOper
             this.loader = (LoadFunc)PigContext.instantiateFuncFromSpec(lFile.getFuncSpec());
             this.loader.setUDFContextSignature(signature);
         }
+        if (lDecorator == null) {
+            setLoadFuncDecorator(new LoadFuncDecorator(loader, signature));
+        }
         return this.loader;
     }
     
+    void setLoadFuncDecorator(LoadFuncDecorator lDecorator) {
+        this.lDecorator = lDecorator;
+    }
+
+    public LoadFuncDecorator getLoadFuncDecorator() {
+        return lDecorator;
+    }
+
     public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
         if(illustrator != null) {
           if (!illustrator.ceilingCheck()) {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java?rev=1797099&r1=1797098&r2=1797099&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java Wed May 31 19:00:07 2017
@@ -78,7 +78,7 @@ public class StoreFuncDecorator {
             }
         } catch (Exception e) {
             if (shouldHandleErrors) {
-                errorHandler.onError(udfSignature, e, tuple);
+                errorHandler.onError(udfSignature, e);
             } else {
                 throw new IOException(e);
             }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigRecordReader.java?rev=1797099&r1=1797098&r2=1797099&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigRecordReader.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigRecordReader.java Wed May 31 19:00:07 2017
@@ -23,9 +23,9 @@ import java.io.IOException;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.pig.LoadFunc;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigRecordReader;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LoadFuncDecorator;
 
 /**
  * Record reader for Spark mode - handles SparkPigSplit
@@ -40,8 +40,8 @@ public class SparkPigRecordReader extend
      * @param context
      * @param limit
      */
-    public SparkPigRecordReader(InputFormat<?, ?> inputformat, PigSplit pigSplit, LoadFunc loadFunc, TaskAttemptContext context, long limit) throws IOException, InterruptedException {
-        super(inputformat, pigSplit, loadFunc, context, limit);
+    public SparkPigRecordReader(InputFormat<?, ?> inputformat, PigSplit pigSplit, LoadFuncDecorator decorator, TaskAttemptContext context, long limit) throws IOException, InterruptedException {
+        super(inputformat, pigSplit, decorator, context, limit);
     }
 
     @Override

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1797099&r1=1797098&r2=1797099&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java Wed May 31 19:00:07 2017
@@ -212,11 +212,6 @@ public class LoadConverter implements RD
 
         loadFunc.setLocation(poLoad.getLFile().getFileName(), job);
 
-        // stolen from JobControlCompiler
-        ArrayList<FileSpec> pigInputs = new ArrayList<FileSpec>();
-        // Store the inp filespecs
-        pigInputs.add(poLoad.getLFile());
-
         ArrayList<List<OperatorKey>> inpTargets = Lists.newArrayList();
         ArrayList<String> inpSignatures = Lists.newArrayList();
         ArrayList<Long> inpLimits = Lists.newArrayList();
@@ -234,7 +229,13 @@ public class LoadConverter implements RD
         inpSignatures.add(poLoad.getSignature());
         inpLimits.add(poLoad.getLimit());
 
-        jobConf.set(PigInputFormat.PIG_INPUTS, ObjectSerializer.serialize(pigInputs));
+        // stolen from JobControlCompiler
+        PhysicalPlan pp = poLoad.getParentPlan();
+        ArrayList<POLoad> pigLoads = new ArrayList<POLoad>();
+        poLoad.setParentPlan(null);
+        pigLoads.add(poLoad);
+        jobConf.set(PigInputFormat.PIG_LOADS, ObjectSerializer.serialize(pigLoads));
+        poLoad.setParentPlan(pp);
         jobConf.set(PigInputFormat.PIG_INPUT_TARGETS, ObjectSerializer.serialize(inpTargets));
         jobConf.set(PigInputFormat.PIG_INPUT_SIGNATURES,
                 ObjectSerializer.serialize(inpSignatures));

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java?rev=1797099&r1=1797098&r2=1797099&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java Wed May 31 19:00:07 2017
@@ -128,7 +128,7 @@ public class PigInputFormatSpark extends
 
         @Override
         public RecordReader<Text, Tuple> createRecordReader() throws IOException, InterruptedException {
-            return new SparkPigRecordReader(inputFormat, pigSplit, loadFunc, context, limit);
+            return new SparkPigRecordReader(inputFormat, pigSplit, decorator, context, limit);
         }
     }
 }
\ No newline at end of file

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1797099&r1=1797098&r2=1797099&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Wed May 31 19:00:07 2017
@@ -652,7 +652,7 @@ public class TezDagBuilder extends TezOp
         }
 
         if (!(tezOp.getLoaderInfo().getLoads().isEmpty())) {
-            payloadConf.set(PigInputFormat.PIG_INPUTS, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInp()));
+            payloadConf.set(PigInputFormat.PIG_LOADS, ObjectSerializer.serialize(tezOp.getLoaderInfo().getLoads()));
             payloadConf.set(PigInputFormat.PIG_INPUT_SIGNATURES, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpSignatureLists()));
             payloadConf.set(PigInputFormat.PIG_INPUT_LIMITS, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpLimits()));
             inputPayLoad = new Configuration(payloadConf);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1797099&r1=1797098&r2=1797099&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java Wed May 31 19:00:07 2017
@@ -201,15 +201,15 @@ public class TezOperator extends Operato
     private boolean isVertexGroup = false;
 
     public static class LoaderInfo implements Serializable {
-        private List<POLoad> loads = null;
+        private ArrayList<POLoad> loads = null;
         private ArrayList<FileSpec> inp = new ArrayList<FileSpec>();
         private ArrayList<String> inpSignatureLists = new ArrayList<String>();
         private ArrayList<Long> inpLimits = new ArrayList<Long>();
         private transient InputSplitInfo inputSplitInfo = null;
-        public List<POLoad> getLoads() {
+        public ArrayList<POLoad> getLoads() {
             return loads;
         }
-        public void setLoads(List<POLoad> loads) {
+        public void setLoads(ArrayList<POLoad> loads) {
             this.loads = loads;
         }
         public ArrayList<FileSpec> getInp() {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java?rev=1797099&r1=1797098&r2=1797099&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java Wed May 31 19:00:07 2017
@@ -81,7 +81,7 @@ public class LoaderProcessor extends Tez
      * @throws InterruptedException
      * @throws ClassNotFoundException
      */
-    private List<POLoad> processLoads(TezOperator tezOp
+    private ArrayList<POLoad> processLoads(TezOperator tezOp
             ) throws VisitorException, IOException, ClassNotFoundException, InterruptedException {
         ArrayList<FileSpec> inp = new ArrayList<FileSpec>();
         ArrayList<List<OperatorKey>> inpTargets = new ArrayList<List<OperatorKey>>();
@@ -90,6 +90,7 @@ public class LoaderProcessor extends Tez
 
         List<POLoad> lds = PlanHelper.getPhysicalOperators(tezOp.plan,
                 POLoad.class);
+        ArrayList<POLoad> poLoads = new ArrayList<POLoad>();
 
         Job job = Job.getInstance(jobConf);
         Configuration conf = job.getConfiguration();
@@ -140,11 +141,13 @@ public class LoaderProcessor extends Tez
                 for (PhysicalOperator sucs : ldSucs) {
                     tezOp.plan.connect(tezLoad, sucs);
                 }
+                poLoads.add(ld);
             }
             UDFContext.getUDFContext().serialize(conf);
             conf.set("udf.import.list",
                     ObjectSerializer.serialize(PigContext.getPackageImportList()));
-            conf.set(PigInputFormat.PIG_INPUTS, ObjectSerializer.serialize(inp));
+
+            conf.set(PigInputFormat.PIG_LOADS, ObjectSerializer.serialize(poLoads));
             conf.set(PigInputFormat.PIG_INPUT_TARGETS, ObjectSerializer.serialize(inpTargets));
             conf.set(PigInputFormat.PIG_INPUT_SIGNATURES, ObjectSerializer.serialize(inpSignatureLists));
             conf.set(PigInputFormat.PIG_INPUT_LIMITS, ObjectSerializer.serialize(inpLimits));
@@ -173,7 +176,7 @@ public class LoaderProcessor extends Tez
             tezOp.setRequestedParallelism(parallelism);
             tezOp.setTotalInputFilesSize(InputSizeReducerEstimator.getTotalInputFileSize(conf, lds, job));
         }
-        return lds;
+        return poLoads;
     }
 
     @Override

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java?rev=1797099&r1=1797098&r2=1797099&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java Wed May 31 19:00:07 2017
@@ -41,6 +41,8 @@ import org.apache.pig.PigCounters;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
@@ -82,7 +84,7 @@ public final class MRJobStats extends Jo
 
     private List<POStore> reduceStores = null;
 
-    private List<FileSpec> loads = null;
+    private List<POLoad> loads = null;
 
     private Boolean disableCounter = false;
 
@@ -215,8 +217,8 @@ public final class MRJobStats extends Jo
                     .get(JobControlCompiler.PIG_MAP_STORES));
             this.reduceStores = (List<POStore>) ObjectSerializer.deserialize(conf
                     .get(JobControlCompiler.PIG_REDUCE_STORES));
-            this.loads = (ArrayList<FileSpec>) ObjectSerializer.deserialize(conf
-                    .get("pig.inputs"));
+            this.loads = (ArrayList<POLoad>) ObjectSerializer.deserialize(conf
+                    .get(PigInputFormat.PIG_LOADS));
             this.disableCounter = conf.getBoolean("pig.disable.counter", false);
         } catch (IOException e) {
             LOG.warn("Failed to deserialize the store list", e);
@@ -481,7 +483,7 @@ public final class MRJobStats extends Jo
         }
 
         if (loads.size() == 1) {
-            FileSpec fsp = loads.get(0);
+            FileSpec fsp = loads.get(0).getLFile();
             if (!MRPigStatsUtil.isTempFile(fsp.getFileName())) {
                 long records = mapInputRecords;
                 InputStats is = new InputStats(fsp.getFileName(),
@@ -493,7 +495,7 @@ public final class MRJobStats extends Jo
             }
         } else {
             for (int i=0; i<loads.size(); i++) {
-                FileSpec fsp = loads.get(i);
+                FileSpec fsp = loads.get(i).getLFile();
                 if (MRPigStatsUtil.isTempFile(fsp.getFileName())) continue;
                 addOneInputStats(fsp.getFileName(), i);
             }

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java?rev=1797099&r1=1797098&r2=1797099&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java Wed May 31 19:00:07 2017
@@ -36,6 +36,7 @@ import org.apache.hadoop.mapred.Counters
 import org.apache.pig.PigCounters;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
 import org.apache.pig.impl.io.FileSpec;
@@ -72,7 +73,7 @@ public class TezVertexStats extends JobS
     private Map<String, Map<String, Long>> counters = null;
 
     private List<POStore> stores = null;
-    private List<FileSpec> loads = null;
+    private List<POLoad> loads = null;
 
     private int numTasks = 0;
     private long numInputRecords = 0;
@@ -139,8 +140,8 @@ public class TezVertexStats extends JobS
             // tez. For now, we keep it since it's used in PigOutputFormat.
             this.stores = (List<POStore>) ObjectSerializer.deserialize(
                     conf.get(JobControlCompiler.PIG_REDUCE_STORES));
-            this.loads = (List<FileSpec>) ObjectSerializer.deserialize(
-                    conf.get(PigInputFormat.PIG_INPUTS));
+            this.loads = (List<POLoad>) ObjectSerializer.deserialize(
+                    conf.get(PigInputFormat.PIG_LOADS));
         } catch (IOException e) {
             LOG.warn("Failed to deserialize the store list", e);
         }
@@ -241,13 +242,13 @@ public class TezVertexStats extends JobS
         }
 
         // There is always only one load in a Tez vertex
-        for (FileSpec fs : loads) {
+        for (POLoad fs : loads) {
             long records = -1;
             long hdfsBytesRead = -1;
-            String filename = fs.getFileName();
+            String filename = fs.getLFile().getFileName();
             if (counters != null) {
                 if (mIGroup != null) {
-                    Long n = mIGroup.get(PigStatsUtil.getMultiInputsCounterName(fs.getFileName(), 0));
+                    Long n = mIGroup.get(PigStatsUtil.getMultiInputsCounterName(fs.getLFile().getFileName(), 0));
                     if (n != null) records = n;
                 }
                 if (records == -1) {

Added: pig/trunk/test/org/apache/pig/test/TestErrorHandlingLoadAndStoreFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestErrorHandlingLoadAndStoreFunc.java?rev=1797099&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestErrorHandlingLoadAndStoreFunc.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestErrorHandlingLoadAndStoreFunc.java Wed May 31 19:00:07 2017
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.pig.CounterBasedErrorHandler;
+import org.apache.pig.ErrorHandling;
+import org.apache.pig.ExecType;
+import org.apache.pig.ErrorHandler;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.builtin.mock.Storage;
+import org.apache.pig.builtin.mock.Storage.Data;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+/**
+ * This class contains Unit tests for load func and store func which has a certain error
+ * threshold set.
+ *
+ */
+public class TestErrorHandlingLoadAndStoreFunc {
+
+    private static PigServer pigServer;
+    private File tempDir;
+
+    @Before
+    public void setup() throws IOException {
+        pigServer = new PigServer(ExecType.LOCAL);
+        tempDir = Files.createTempDir();
+        tempDir.deleteOnExit();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        pigServer.shutdown();
+        tempDir.delete();
+    }
+
+    public static class TestErroroneousStoreFunc extends PigStorage implements
+            ErrorHandling {
+        protected static AtomicLong COUNTER = new AtomicLong();
+
+        @Override
+        public void putNext(Tuple f) throws IOException {
+            long count = COUNTER.incrementAndGet();
+            super.putNext(f);
+            if (count % 3 == 0) {
+                throw new RuntimeException("Throw error for test");
+            }
+        }
+
+        @Override
+        public ErrorHandler getErrorHandler() {
+            return new CounterBasedErrorHandler();
+        }
+    }
+
+    public static class TestErroroneousStoreFunc2 extends PigStorage implements
+            ErrorHandling {
+        protected static AtomicLong COUNTER = new AtomicLong();
+
+        @Override
+        public void putNext(Tuple f) throws IOException {
+            long count = COUNTER.incrementAndGet();
+            super.putNext(f);
+            if (count % 3 == 0) {
+                throw new RuntimeException("Throw error for test");
+            }
+        }
+
+        @Override
+        public ErrorHandler getErrorHandler() {
+            return new CounterBasedErrorHandler();
+        }
+    }
+
+    public static class TestErroroneousLoadFunc extends PigStorage implements
+            ErrorHandling {
+        protected static AtomicLong COUNTER = new AtomicLong();
+
+        @Override
+        public Tuple getNext() throws IOException {
+            long count = COUNTER.incrementAndGet();
+            Tuple t = super.getNext();
+            if (count % 3 == 0) {
+                throw new RuntimeException("Throw error for test");
+            }
+            return t;
+        }
+
+        @Override
+        public ErrorHandler getErrorHandler() {
+            return new CounterBasedErrorHandler();
+        }
+    }
+
+    public static class TestErroroneousLoadFunc2 extends PigStorage implements
+            ErrorHandling {
+        protected static AtomicLong COUNTER = new AtomicLong();
+
+        @Override
+        public Tuple getNext() throws IOException {
+            long count = COUNTER.incrementAndGet();
+            Tuple t = super.getNext();
+            if (count % 3 == 0) {
+                throw new RuntimeException("Throw error for test");
+            }
+            return t;
+        }
+
+        @Override
+        public ErrorHandler getErrorHandler() {
+            return new CounterBasedErrorHandler();
+        }
+    }
+
+    /**
+     * Test Pig job succeeds even with errors within threshold
+     *
+     */
+    @Test
+    public void testStorerWithErrorInLimit() throws Exception {
+        updatePigProperties(true, 3L, 0.4);
+        runTestStore(JOB_STATUS.COMPLETED);
+    }
+
+    /**
+     * Test Pig job fails if errors exceed min errors and threshold
+     */
+    @Test
+    public void testStorerWithErrorOutExceedingLimit() throws Exception {
+        updatePigProperties(true, 2L, 0.3);
+        runTestStore(JOB_STATUS.FAILED);
+    }
+
+    /**
+     * Test Pig Job fails on error if the config is set to false
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testStorerWithConfigNotEnabled() throws Exception {
+        updatePigProperties(false, 3L, 0.3);
+        runTestStore(JOB_STATUS.FAILED);
+    }
+
+    /**
+     * Test Pig job succeeds even with errors within threshold
+     *
+     */
+    @Test
+    public void testLoaderWithErrorInLimit() throws Exception {
+        updatePigProperties(true, 3L, 0.4);
+        runTestLoad(JOB_STATUS.COMPLETED);
+    }
+
+    /**
+     * Test Pig job fails if errors exceed min errors and threshold
+     */
+    @Test
+    public void testLoaderWithErrorOutExceedingLimit() throws Exception {
+        updatePigProperties(true, 1L, 0.1);
+        runTestLoad(JOB_STATUS.FAILED);
+    }
+
+    /**
+     * Test Pig Job fails on error if the config is set to false
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testLoaderWithConfigNotEnabled() throws Exception {
+        updatePigProperties(false, 3L, 0.3);
+        runTestLoad(JOB_STATUS.FAILED);
+    }
+
+    /**
+     * Test Pig Job with multiple stores.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testMultiStore() throws Exception {
+        updatePigProperties(true, 3L, 0.4);
+        pigServer.getPigContext().getProperties()
+                .put(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
+        Data data = Storage.resetData(pigServer);
+        final Collection<Tuple> list = Lists.newArrayList();
+        // Create input dataset
+        int rows = 10;
+        for (int i = 0; i < rows; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple();
+            t.append(i);
+            t.append("a" + i);
+            list.add(t);
+        }
+        data.set("in", "id:int,name:chararray", list);
+        pigServer.setBatchOn();
+        String loadQuery = "A = LOAD 'in' using mock.Storage();";
+        pigServer.registerQuery(loadQuery);
+        String storeAQuery = "store A into '" + tempDir.getAbsolutePath()
+                + "/output' using " + TestErroroneousStoreFunc.class.getName()
+                + "();";
+        pigServer.registerQuery(storeAQuery);
+        pigServer.registerQuery("B = FILTER A by id >0;");
+        String storeBQuery = "store B into '" + tempDir.getAbsolutePath()
+                + "/output2' using "
+                + TestErroroneousStoreFunc2.class.getName() + "();";
+        pigServer.registerQuery(storeBQuery);
+        if (pigServer.executeBatch().get(0).getStatus() != JOB_STATUS.COMPLETED) {
+            throw new RuntimeException("Job failed", pigServer.executeBatch()
+                    .get(0).getException());
+        }
+    }
+
+    /**
+     * Test Pig Job with multiple loaders.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testMultiLoad() throws Exception {
+        updatePigProperties(true, 3L, 0.4);
+        pigServer.getPigContext().getProperties()
+              .put(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
+        Storage.resetData(pigServer);
+
+        String inputLocation1 = tempDir.getAbsolutePath() + "/ina";
+        String inputLocation2 = tempDir.getAbsolutePath() + "/inb";
+        writeFile(inputLocation1);
+        writeFile(inputLocation2);
+
+        pigServer.setBatchOn();
+        String loadQuery1 = "A = LOAD '" + inputLocation1 + "' USING " + TestErroroneousLoadFunc.class.getName() + "();";
+        pigServer.registerQuery(loadQuery1);
+        String loadQuery2 = "B = LOAD '" + inputLocation2 + "' USING " + TestErroroneousLoadFunc2.class.getName() + "();";
+        pigServer.registerQuery(loadQuery2);
+        String storeQuery1 = "store A into '/output1' using mock.Storage();";
+        pigServer.registerQuery(storeQuery1);
+        String storeQuery2 = "store B into '/output2' using mock.Storage();";
+        pigServer.registerQuery(storeQuery2);
+
+        if (pigServer.executeBatch().get(0).getStatus() != JOB_STATUS.COMPLETED) {
+            throw new RuntimeException("Job did not reach the expected status"
+                    + pigServer.executeBatch().get(0).getStatus());
+        }
+    }
+
+    private void runTestStore(JOB_STATUS expectedJobStatus) throws Exception {
+        Data data = Storage.resetData(pigServer);
+        final Collection<Tuple> list = Lists.newArrayList();
+        // Create input dataset
+        int rows = 10;
+        for (int i = 0; i < rows; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple();
+            t.append(i);
+            t.append("a" + i);
+            list.add(t);
+        }
+        data.set("in", "id:int,name:chararray", list);
+
+        pigServer.setBatchOn();
+        String loadQuery = "A = LOAD 'in' USING mock.Storage();";
+        pigServer.registerQuery(loadQuery);
+        String storeQuery = "store A into '" + tempDir.getAbsolutePath()
+                + "/output' using " + TestErroroneousStoreFunc.class.getName()
+                + "();";
+        pigServer.registerQuery(storeQuery);
+
+        if (pigServer.executeBatch().get(0).getStatus() != expectedJobStatus) {
+            throw new RuntimeException("Job did not reach the expected status"
+                    + pigServer.executeBatch().get(0).getStatus());
+        }
+    }
+
+    private void runTestLoad(JOB_STATUS expectedJobStatus) throws Exception {
+        Storage.resetData(pigServer);
+        String inputLocation = tempDir.getAbsolutePath() + "/in";
+        writeFile(inputLocation);
+        pigServer.setBatchOn();
+        String loadQuery = "A = LOAD '" + inputLocation + "' USING " + TestErroroneousLoadFunc.class.getName() + "();";
+        pigServer.registerQuery(loadQuery);
+        String storeQuery = "store A into '" + tempDir.getAbsolutePath()
+                + "/output' using mock.Storage();";
+        pigServer.registerQuery(storeQuery);
+
+        if (pigServer.executeBatch().get(0).getStatus() != expectedJobStatus) {
+            throw new RuntimeException("Job did not reach the expected status"
+                    + pigServer.executeBatch().get(0).getStatus());
+        }
+    }
+
+    private void updatePigProperties(boolean allowErrors, long minErrors,
+            double errorThreshold) {
+        Properties properties = pigServer.getPigContext().getProperties();
+        properties.put(PigConfiguration.PIG_ERROR_HANDLING_ENABLED,
+                Boolean.toString(allowErrors));
+        properties.put(PigConfiguration.PIG_ERROR_HANDLING_MIN_ERROR_RECORDS,
+                Long.toString(minErrors));
+        properties.put(PigConfiguration.PIG_ERROR_HANDLING_THRESHOLD_PERCENT,
+                Double.toString(errorThreshold));
+    }
+
+    private void writeFile(String fileLocation) throws IOException {
+        try{
+            PrintWriter writer = new PrintWriter(fileLocation, "UTF-8");
+            int rows = 20;
+            for (int i = 0; i < rows; i++) {
+                writer.println("a" + i);
+            }
+            writer.close();
+        } catch (IOException e) {
+           throw new IOException(e);
+        }
+    }
+}



Mime
View raw message