pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pradeep...@apache.org
Subject svn commit: r747954 [1/2] - in /hadoop/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLaye...
Date Wed, 25 Feb 2009 23:08:08 GMT
Author: pradeepkth
Date: Wed Feb 25 23:08:07 2009
New Revision: 747954

URL: http://svn.apache.org/viewvc?rev=747954&view=rev
Log:
PIG-591: Error handling phase 4 (sms via pradeepkth)

Added:
    hadoop/pig/trunk/src/org/apache/pig/PigWarning.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/EvalFunc.java
    hadoop/pig/trunk/src/org/apache/pig/Main.java
    hadoop/pig/trunk/src/org/apache/pig/PigServer.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
    hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java
    hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/impl/plan/CompilationMessageCollector.java
    hadoop/pig/trunk/src/org/apache/pig/tools/grunt/Grunt.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=747954&r1=747953&r2=747954&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Feb 25 23:08:07 2009
@@ -430,3 +430,5 @@
     PIG-658: Data type long : When 'L' or 'l' is included with data 
     (123L or 123l) load produces null value. Also the case with Float (thejas
     via sms)
+
+    PIG-591: Error handling phase four (sms via pradeepkth)

Modified: hadoop/pig/trunk/src/org/apache/pig/EvalFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/EvalFunc.java?rev=747954&r1=747953&r2=747954&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/EvalFunc.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/EvalFunc.java Wed Feb 25 23:08:07 2009
@@ -30,6 +30,7 @@
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigProgressable;
 
 
@@ -48,6 +49,7 @@
     // if the exec is taking more that 300 ms
     protected PigProgressable reporter;
     protected Log log = LogFactory.getLog(getClass());
+    protected PigLogger pigLogger;
 
     private static int nextSchemaId; // for assigning unique ids to UDF columns
     protected String getSchemaName(String name, Schema input) {
@@ -118,7 +120,12 @@
     // report that progress is being made (otherwise hadoop times out after 600 seconds working on one outer tuple)
     public final void progress() {
         if (reporter != null) reporter.progress();
-        else log.warn("No reporter object provided to UDF " + this.getClass().getName());
+        else warn("No reporter object provided to UDF.", PigWarning.PROGRESS_REPORTER_NOT_PROVIDED);
+    }
+    
+    public final void warn(String msg, Enum warningEnum) {
+    	if(pigLogger != null) pigLogger.warn(this, msg, warningEnum);
+    	else log.warn("No logger object provided to UDF: " + this.getClass().getName() + ". " + msg);
     }
 
     /**
@@ -177,4 +184,16 @@
     public List<FuncSpec> getArgToFuncMapping() throws FrontendException{
         return null;
     }
+    
+    public PigLogger getPigLogger() {
+        return pigLogger;
+    }
+
+    public final void setPigLogger(PigLogger pigLogger) {
+        this.pigLogger = pigLogger;
+    }
+    
+    public Log getLogger() {
+    	return log;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/Main.java?rev=747954&r1=747953&r2=747954&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/Main.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/Main.java Wed Feb 25 23:08:07 2009
@@ -40,6 +40,7 @@
 import org.apache.pig.tools.cmdline.CmdLineParser;
 import org.apache.pig.tools.grunt.Grunt;
 import org.apache.pig.tools.grunt.PigCompletor;
+import org.apache.pig.tools.grunt.Utils;
 import org.apache.pig.tools.timer.PerformanceTimerFactory;
 import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
 
@@ -71,6 +72,10 @@
     int rc = 1;
     Properties properties = new Properties();
     PropertiesUtil.loadPropertiesFromFile(properties);
+    
+    boolean verbose = false;
+    boolean gruntCalled = false;
+    String logFileName = validateLogFile(null, null);
 
     try {
         BufferedReader pin = null;
@@ -96,6 +101,7 @@
         opts.registerOpt('m', "param_file", CmdLineParser.ValueExpected.OPTIONAL);
         opts.registerOpt('r', "dryrun", CmdLineParser.ValueExpected.NOT_ACCEPTED);
         opts.registerOpt('l', "logfile", CmdLineParser.ValueExpected.REQUIRED);
+        opts.registerOpt('w', "warning", CmdLineParser.ValueExpected.NOT_ACCEPTED);
 
         ExecMode mode = ExecMode.UNKNOWN;
         String file = null;
@@ -110,7 +116,8 @@
             cluster = clusterConfigured;
         }
         
-        String logFileName = validateLogFile(null, null);       
+        //by default warning aggregation is on
+        properties.setProperty("aggregate.warning", ""+true);
 
         char opt;
         while ((opt = opts.getNextOpt()) != CmdLineParser.EndOfOpts) {
@@ -202,6 +209,11 @@
                             
             case 'v':
                 properties.setProperty(VERBOSE, ""+true);
+                verbose = true;
+                break;
+
+            case 'w':
+                properties.setProperty("aggregate.warning", ""+false);
                 break;
 
             case 'x':
@@ -252,6 +264,7 @@
                 new File(substFile).deleteOnExit();
             
             grunt = new Grunt(pin, pigContext);
+            gruntCalled = true;
             grunt.exec();
             rc = 0;
             return;
@@ -267,6 +280,7 @@
             }
             in = new BufferedReader(new StringReader(sb.toString()));
             grunt = new Grunt(in, pigContext);
+            gruntCalled = true;
             grunt.exec();
             rc = 0;
             return;
@@ -293,6 +307,7 @@
             ConsoleReaderInputStream inputStream = new ConsoleReaderInputStream(reader);
             grunt = new Grunt(new BufferedReader(new InputStreamReader(inputStream)), pigContext);
             grunt.setConsoleReader(reader);
+            gruntCalled = true;
             grunt.run();
             rc = 0;
             return;
@@ -320,6 +335,7 @@
                 new File(substFile).deleteOnExit();
 
             grunt = new Grunt(pin, pigContext);
+            gruntCalled = true;
             grunt.exec();
             rc = 0;
             return;
@@ -338,8 +354,14 @@
         } else {
             rc = 2;
         }
+        if(!gruntCalled) {
+        	Utils.writeLog(pe, logFileName, log, verbose);
+        }
     } catch (Throwable e) {
         rc = 2;
+        if(!gruntCalled) {
+        	Utils.writeLog(e, logFileName, log, verbose);
+        }
     } finally {
         // clear temp files
         FileLocalizer.deleteTempFiles();
@@ -455,6 +477,7 @@
     System.out.println("    -x, -exectype local|mapreduce, mapreduce is default");
     System.out.println("    -i, -version display version information");
     System.out.println("    -l, -logfile path to client side log file; current working directory is default");
+    System.out.println("    -w, -warning turn warning on; also turns warning aggregation off");
 }
 
 private static String validateLogFile(String logFileName, String scriptName) {

Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=747954&r1=747953&r2=747954&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Wed Feb 25 23:08:07 2009
@@ -61,6 +61,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
 import org.apache.pig.impl.streaming.StreamingCommand;
 import org.apache.pig.impl.util.PropertiesUtil;
 import org.apache.pig.impl.logicalLayer.LODefine;
@@ -103,6 +104,7 @@
     
     private String scope = constructScope();
     private ArrayList<String> cachedScript = new ArrayList<String>();
+    private boolean aggregateWarning = true;
     
     private String constructScope() {
         // scope servers for now as a session id
@@ -135,6 +137,9 @@
         if (this.pigContext.getProperties().getProperty(PigContext.JOB_NAME) == null) {
             setJobName("DefaultJobName") ;
         }
+        
+        aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+        
         if (connect) {
             pigContext.connect();
         }
@@ -729,33 +734,16 @@
             // throw.
             caught = fe;            
         }
-        // Check to see if we had any problems.
-        StringBuilder sb = new StringBuilder();
-        for (CompilationMessageCollector.Message msg : collector) {
-            switch (msg.getMessageType()) {
-            case Info:
-                log.info(msg.getMessage());
-                break;
-
-            case Warning:
-                log.warn(msg.getMessage());
-                break;
-
-            case Unknown:
-            case Error:
-                //Error messages are displayed separately and are not bundled here
-                break;
-
-            default:
-                throw new AssertionError("Unknown message type " +
-                    msg.getMessageType());
-
-            }
+        
+        if(aggregateWarning) {
+        	CompilationMessageCollector.logMessages(collector, MessageType.Warning, aggregateWarning, log);
+        } else {
+        	for(Enum type: MessageType.values()) {
+        		CompilationMessageCollector.logAllMessages(collector, log);
+        	}
         }
-
-        if (sb.length() > 0 || caught != null) {
-            //int errCode = 1003;
-            //throw new FrontendException(sb.toString(), errCode, PigException.INPUT, false, null, caught);
+        
+        if (caught != null) {
             throw caught;
         }
 

Added: hadoop/pig/trunk/src/org/apache/pig/PigWarning.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigWarning.java?rev=747954&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigWarning.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/PigWarning.java Wed Feb 25 23:08:07 2009
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+/**
+ * An enum to enumerate the warning types in Pig
+ * 
+ */
+public enum PigWarning {
+	DID_NOT_FIND_LOAD_ONLY_MAP_PLAN,
+	DIVIDE_BY_ZERO,
+	FIELD_DISCARDED,
+	GROUP_BY_INCOMPATIBLE_TYPES,
+	IMPLICIT_CAST_TO_BAG,
+	IMPLICIT_CAST_TO_CHARARRAY,
+	IMPLICIT_CAST_TO_DOUBLE,
+	IMPLICIT_CAST_TO_FLOAT,
+	IMPLICIT_CAST_TO_INT,
+	IMPLICIT_CAST_TO_LONG,
+	IMPLICIT_CAST_TO_MAP,
+	IMPLICIT_CAST_TO_TUPLE,
+	TOO_LARGE_FOR_INT,
+	MULTI_LEAF_MAP,
+	MULTI_LEAF_REDUCE,
+	NON_PACKAGE_REDUCE_PLAN_ROOT,
+	NON_EMPTY_COMBINE_PLAN,
+	PROGRESS_REPORTER_NOT_PROVIDED,
+	REDUCE_PLAN_NOT_EMPTY_WHILE_MAP_PLAN_UNDER_PROCESS,
+	UDF_WARNING_1, //placeholder for UDF warnings
+	UDF_WARNING_2, //placeholder for UDF warnings
+	UDF_WARNING_3, //placeholder for UDF warnings
+	UDF_WARNING_4, //placeholder for UDF warnings
+	UDF_WARNING_5, //placeholder for UDF warnings
+	UDF_WARNING_6, //placeholder for UDF warnings
+	UDF_WARNING_7, //placeholder for UDF warnings
+	UDF_WARNING_8, //placeholder for UDF warnings
+	UDF_WARNING_9, //placeholder for UDF warnings
+	UDF_WARNING_10, //placeholder for UDF warnings
+	UDF_WARNING_11,	//placeholder for UDF warnings
+	UDF_WARNING_12,	//placeholder for UDF warnings
+	UNABLE_TO_CREATE_FILE_TO_SPILL,
+	UNABLE_TO_SPILL,
+	UNABLE_TO_CLOSE_SPILL_FILE,
+	UNREACHABLE_CODE_BOTH_MAP_AND_REDUCE_PLANS_PROCESSED,
+	USING_OVERLOADED_FUNCTION;
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=747954&r1=747953&r2=747954&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Wed Feb 25 23:08:07 2009
@@ -200,7 +200,7 @@
         }
         catch (IOException e) {
             int errCode = 6009;
-            String msg = "Failed to create job client";
+            String msg = "Failed to create job client:" + e.getMessage();
             throw new ExecException(msg, errCode, PigException.BUG, e);
         }
     }
@@ -549,10 +549,19 @@
             //this should return as soon as connection is shutdown
             int rc = p.waitFor();
             if (rc != 0) {
-                String errMsg = new String();
+                StringBuilder errMsg = new StringBuilder();
                 try {
-                    BufferedReader br = new BufferedReader(new InputStreamReader(p.getErrorStream()));
-                    errMsg = br.readLine();
+                    BufferedReader br = new BufferedReader(new InputStreamReader(p.getInputStream()));
+                    String line = null;
+                    while((line = br.readLine()) != null) {
+                        errMsg.append(line);
+                    }
+                    br.close();
+                    br = new BufferedReader(new InputStreamReader(p.getErrorStream()));
+                    line = null;
+                    while((line = br.readLine()) != null) {
+                        errMsg.append(line);
+                    }
                     br.close();
                 } catch (IOException ioe) {}
                 int errCode = 6011;
@@ -563,7 +572,7 @@
                 msg.append("; return code: ");
                 msg.append(rc);
                 msg.append("; error: ");
-                msg.append(errMsg);
+                msg.append(errMsg.toString());
                 throw new ExecException(msg.toString(), errCode, PigException.REMOTE_ENVIRONMENT);
             }
         } catch (Exception e){

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=747954&r1=747953&r2=747954&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Wed Feb 25 23:08:07 2009
@@ -25,6 +25,7 @@
 
 import org.apache.pig.PigException;
 import org.apache.pig.FuncSpec;
+import org.apache.pig.PigWarning;
 import org.apache.pig.data.DataType;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
@@ -42,6 +43,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.impl.plan.CompilationMessageCollector;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -49,6 +51,7 @@
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.PlanWalker;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
 import org.apache.pig.impl.plan.optimizer.OptimizerException;
 
 /**
@@ -106,10 +109,24 @@
     private byte mKeyType = 0;
     
     private String chunkSize;
+    
+    private CompilationMessageCollector messageCollector = null;
 
     public CombinerOptimizer(MROperPlan plan, String chunkSize) {
         super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
         this.chunkSize = chunkSize;
+        messageCollector = new CompilationMessageCollector() ; 
+    }
+
+    public CombinerOptimizer(MROperPlan plan, String chunkSize, 
+    		CompilationMessageCollector messageCollector) {
+        super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
+        this.chunkSize = chunkSize;
+        this.messageCollector = messageCollector ; 
+    }
+    
+    public CompilationMessageCollector getMessageCollector() {
+    	return messageCollector;
     }
 
     @Override
@@ -121,7 +138,7 @@
         // Find the POLocalRearrange in the map.  I'll need it later.
         List<PhysicalOperator> mapLeaves = mr.mapPlan.getLeaves();
         if (mapLeaves == null || mapLeaves.size() != 1) {
-            log.warn("Expected map to have single leaf!");
+            messageCollector.collect("Expected map to have single leaf!", MessageType.Warning, PigWarning.MULTI_LEAF_MAP);
             return;
         }
         PhysicalOperator mapLeaf = mapLeaves.get(0);
@@ -132,7 +149,7 @@
 
         List<PhysicalOperator> reduceRoots = mr.reducePlan.getRoots();
         if (reduceRoots.size() != 1) {
-            log.warn("Expected reduce to have single leaf");
+        	messageCollector.collect("Expected reduce to have single leaf", MessageType.Warning, PigWarning.MULTI_LEAF_REDUCE);
             return;
         }
 
@@ -140,7 +157,7 @@
         // not, I don't know what's going on, so I'm out of here.
         PhysicalOperator root = reduceRoots.get(0);
         if (!(root instanceof POPackage)) {
-            log.warn("Expected reduce root to be a POPackage");
+        	messageCollector.collect("Expected reduce root to be a POPackage", MessageType.Warning, PigWarning.NON_PACKAGE_REDUCE_PLAN_ROOT);
             return;
         }
         POPackage pack = (POPackage)root;
@@ -197,8 +214,8 @@
 				// udf will be a POProject which will project the column
 				// corresponding to the position of the udf in the foreach
                 if (mr.combinePlan.getRoots().size() != 0) {
-                    log.warn("Wasn't expecting to find anything already "
-                        + "in the combiner!");
+                	messageCollector.collect("Wasn't expecting to find anything already "
+                        + "in the combiner!", MessageType.Warning, PigWarning.NON_EMPTY_COMBINE_PLAN);
                     return;
                 }
                 mr.combinePlan = new PhysicalPlan();

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java?rev=747954&r1=747953&r2=747954&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java Wed Feb 25 23:08:07 2009
@@ -327,12 +327,16 @@
             	 * the regex matcher ends at one position beyond the match
             	 * in this case it will end at colon (:)
             	 * the exception message will have a preceding space (after the colon (:)) 
-            	 */            	
-            	exceptionMessage = stackTraceLines[startingLineNum].substring(exceptionNameMatcher.end() + 2);
-            	++startingLineNum;
+            	 */ 
             	if (exceptionName.contains(OOM_ERR)) {
             	    outOfMemory = true;
             	}
+            	
+            	if(stackTraceLines[startingLineNum].length() > exceptionNameMatcher.end()) {
+	            	exceptionMessage = stackTraceLines[startingLineNum].substring(exceptionNameMatcher.end() + 2);
+            	}
+            	
+            	++startingLineNum;
             }
         	
             //the exceptionName should not be null

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=747954&r1=747953&r2=747954&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Wed Feb 25 23:08:07 2009
@@ -32,6 +32,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.PigException;
+import org.apache.pig.PigWarning;
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
@@ -65,6 +66,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.impl.plan.CompilationMessageCollector;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.Operator;
@@ -72,6 +74,7 @@
 import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
 import org.apache.pig.impl.util.Pair;
 
 /**
@@ -144,6 +147,8 @@
     
     private UDFFinder udfFinder;
     
+    private CompilationMessageCollector messageCollector = null;
+    
     public MRCompiler(PhysicalPlan plan) throws MRCompilerException {
         this(plan,null);
     }
@@ -166,6 +171,7 @@
         	throw new MRCompilerException(msg, errCode, PigException.BUG);
         }
         scope = roots.get(0).getOperatorKey().getScope();
+        messageCollector = new CompilationMessageCollector() ;
     }
     
     public void randomizeFileLocalizer(){
@@ -188,6 +194,10 @@
         return plan;
     }
     
+    public CompilationMessageCollector getMessageCollector() {
+    	return messageCollector;
+    }
+    
     /**
      * The front-end method that the user calls to compile
      * the plan. Assumes that all submitted plans have a Store
@@ -728,13 +738,15 @@
                 }
                 else
                 {
-                    log.warn("Something in the reduce plan while map plan is not done. Something wrong!");
+                    messageCollector.collect("Something in the reduce plan while map plan is not done. Something wrong!", 
+                    		MessageType.Warning, PigWarning.REDUCE_PLAN_NOT_EMPTY_WHILE_MAP_PLAN_UNDER_PROCESS);
                 }
             } else if (mro.isMapDone() && !mro.isReduceDone()) {
             	// limit should add into reduce plan
                 mro.reducePlan.addAsLeaf(op);
             } else {
-                log.warn("Both map and reduce phases have been done. This is unexpected while compiling!");
+            	messageCollector.collect("Both map and reduce phases have been done. This is unexpected while compiling!",
+            			MessageType.Warning, PigWarning.UNREACHABLE_CODE_BOTH_MAP_AND_REDUCE_PLANS_PROCESSED);
             }
         }catch(Exception e){
             int errCode = 2034;
@@ -1553,8 +1565,9 @@
             if (succMpLeaves == null || succMpLeaves.size() > 1 ||
                     succMpRoots == null || succMpRoots.size() > 1 ||
                     succMpLeaves.get(0) != succMpRoots.get(0)) {
-                log.warn("Expected to find subsequent map " +
-                    "with just a load, but didn't");
+            		messageCollector.collect("Expected to find subsequent map " +
+                    "with just a load, but didn't",
+                    MessageType.Warning, PigWarning.DID_NOT_FIND_LOAD_ONLY_MAP_PLAN);
                 return;
             }
             PhysicalOperator load = succMpRoots.get(0);

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=747954&r1=747953&r2=747954&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Wed Feb 25 23:08:07 2009
@@ -20,15 +20,22 @@
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
 import org.apache.pig.PigException;
+import org.apache.pig.PigWarning;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecutionEngine;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -41,8 +48,11 @@
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.POPackageAnnotator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
+import org.apache.pig.impl.plan.CompilationMessageCollector;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.CompilationMessageCollector.Message;
+import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
 import org.apache.pig.impl.util.ConfigurationValidator;
 
 /**
@@ -54,6 +64,7 @@
  
     //used to track the exception thrown by the job control which is run in a separate thread
     private Exception jobControlException = null;
+    private boolean aggregateWarning = false;
     
     @Override
     public boolean launchPig(PhysicalPlan php,
@@ -65,6 +76,7 @@
                                                    JobCreationException,
                                                    Exception {
         long sleepTime = 5000;
+        aggregateWarning = "true".equalsIgnoreCase(pc.getProperties().getProperty("aggregate.warning"));
         MROperPlan mrp = compile(php, pc);
         
         ExecutionEngine exe = pc.getExecutionEngine();
@@ -124,13 +136,23 @@
             return false;
         }
 
+        Map<Enum, Long> warningAggMap = new HashMap<Enum, Long>();
+                
         List<Job> succJobs = jc.getSuccessfulJobs();
         if(succJobs!=null)
             for(Job job : succJobs){
                 getStats(job,jobClient, false, pc);
+                if(aggregateWarning) {
+                	computeWarningAggregate(job, jobClient, warningAggMap);
+                }
             }
 
-        jc.stop(); 
+        jc.stop();
+        
+        if(aggregateWarning) {
+        	CompilationMessageCollector.logAggregate(warningAggMap, MessageType.Warning, log) ;
+        }
+
         log.info( "100% complete");
         log.info("Success!");
         return true;
@@ -156,6 +178,10 @@
         comp.randomizeFileLocalizer();
         comp.compile();
         MROperPlan plan = comp.getMRPlan();
+        
+        //display the warning message(s) from the MRCompiler
+        comp.getMessageCollector().logMessages(MessageType.Warning, aggregateWarning, log);
+        
         String lastInputChunkSize = 
             pc.getProperties().getProperty(
                     "last.input.chunksize", POJoinPackage.DEFAULT_CHUNK_SIZE);
@@ -163,6 +189,8 @@
         if (!("true".equals(prop)))  {
             CombinerOptimizer co = new CombinerOptimizer(plan, lastInputChunkSize);
             co.visit();
+            //display the warning message(s) from the CombinerOptimizer
+            co.getMessageCollector().logMessages(MessageType.Warning, aggregateWarning, log);
         }
         
         // optimize key - value handling in package
@@ -207,5 +235,23 @@
     		}
     	}
     }
- 
+    
+    void computeWarningAggregate(Job job, JobClient jobClient, Map<Enum, Long> aggMap) {
+    	JobID mapRedJobID = job.getAssignedJobID();
+    	RunningJob runningJob = null;
+    	try {
+    		runningJob = jobClient.getJob(mapRedJobID);
+    		Counters counters = runningJob.getCounters();
+    		for(Enum e : PigWarning.values()) {
+    			Long currentCount = aggMap.get(e);
+    			currentCount = (currentCount == null? 0 : currentCount);
+    			currentCount += counters.getCounter(e);
+    			aggMap.put(e, currentCount);
+    		}
+    	} catch (IOException ioe) {
+    		String msg = "Unable to retrieve job to compute warning aggregation.";
+    		log.warn(msg);
+    	}    	
+    }
+
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=747954&r1=747953&r2=747954&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Wed Feb 25 23:08:07 2009
@@ -38,6 +38,7 @@
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
@@ -45,8 +46,10 @@
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.TargetedTuple;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.WrappedIOException;
@@ -79,6 +82,9 @@
         PhysicalOperator[] roots;
         PhysicalOperator leaf;
         
+        PigContext pigContext = null;
+        private volatile boolean initialized = false;
+        
         /**
          * Configures the Reduce plan, the POPackage operator
          * and the reporter thread
@@ -110,6 +116,9 @@
                     roots = cp.getRoots().toArray(new PhysicalOperator[1]);
                     leaf = cp.getLeaves().get(0);
                 }
+                
+                pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
+                
             } catch (IOException ioe) {
                 String msg = "Problem while configuring combiner's reduce plan.";
                 throw new RuntimeException(msg, ioe);
@@ -127,8 +136,18 @@
                 OutputCollector<PigNullableWritable, Writable> oc,
                 Reporter reporter) throws IOException {
             
-            pigReporter.setRep(reporter);
-            PhysicalOperator.setReporter(pigReporter);
+        	if(!initialized) {
+        		initialized = true;
+	            pigReporter.setRep(reporter);	            
+	            PhysicalOperator.setReporter(pigReporter);
+
+	            boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+
+	            PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+	            pigHadoopLogger.setAggregate(aggregateWarning);
+	            pigHadoopLogger.setReporter(reporter);
+	            PhysicalOperator.setPigLogger(pigHadoopLogger);
+        	}
             
             // In the case we optimize, we combine
             // POPackage and POForeach - so we could get many

Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=747954&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java Wed Feb 25 23:08:07 2009
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigProgressable;
+
+/**
+ * 
+ * A singleton class that implements the PigLogger interface
+ * for use in map reduce context. Provides ability to aggregate
+ * warning messages
+ */
+public final class PigHadoopLogger implements PigLogger {
+	private static PigHadoopLogger instance = new PigHadoopLogger();  
+    
+	public static synchronized PigHadoopLogger getInstance() {
+		if (instance == null) {
+			instance = new PigHadoopLogger();
+		}
+		return instance;
+	}	
+
+	private static Log log = LogFactory.getLog(PigHadoopLogger.class);
+	private Reporter reporter = null;
+	private boolean aggregate = false;
+
+    private PigHadoopLogger() {
+    }    
+
+    public void warn(Object o, String msg, Enum warningEnum) {
+    	String displayMessage = o.getClass().getName() + ": " + msg;
+    	if(aggregate) {
+    		if(reporter != null) {
+    			reporter.incrCounter(warningEnum, 1);
+    		} else {
+    			//TODO:    			
+    			//in local mode of execution if the PigHadoopLogger is used initially,
+    			//then aggregation cannot be performed as the reporter will be null. 
+    			//The reference to a reporter is given by Hadoop at run time. 
+    			//In local mode, due to the absence of Hadoop there will be no reporter
+    			//Just print the warning message as is.
+    			//If a warning message is printed in map reduce mode when aggregation
+    			//is turned on then we have a problem, its a bug.
+    			log.warn(displayMessage);
+    		}
+    	} else {
+        	log.warn(displayMessage);
+    	}
+    }    
+
+    public Reporter getReporter() {
+        return reporter;
+    }
+
+    public synchronized void setReporter(Reporter rep) {
+    		this.reporter = rep;
+    }
+    
+    public boolean getAggregate() {
+    	return aggregate;
+    }
+    
+    public synchronized void setAggregate(boolean aggregate) {
+    		this.aggregate = aggregate;
+    }
+
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=747954&r1=747953&r2=747954&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Wed Feb 25 23:08:07 2009
@@ -34,6 +34,7 @@
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.TargetedTuple;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
@@ -71,7 +72,8 @@
 
     private PhysicalOperator leaf;
 
-    private boolean initialized = false;
+    PigContext pigContext = null;
+    private volatile boolean initialized = false;
     
     /**
      * Will be called when all the tuples in the input
@@ -106,7 +108,9 @@
         try {
             finisher.visit();
         } catch (VisitorException e) {
-            throw new IOException("Error trying to finish UDFs",e);
+        	int errCode = 2121;
+        	String msg = "Error while calling finish method on UDFs.";
+            throw new VisitorException(msg, errCode, PigException.BUG, e);
         }
         
         mp = null;
@@ -150,6 +154,9 @@
                 roots = targetOpsAsList.toArray(new PhysicalOperator[1]);
                 leaf = mp.getLeaves().get(0);
             }
+            
+            pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
+            
         } catch (IOException ioe) {
             String msg = "Problem while configuring map plan.";
             throw new RuntimeException(msg, ioe);
@@ -176,6 +183,13 @@
             this.outputCollector = oc;
             pigReporter.setRep(reporter);
             PhysicalOperator.setReporter(pigReporter);
+            
+            boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+
+            PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+            pigHadoopLogger.setAggregate(aggregateWarning);
+            pigHadoopLogger.setReporter(reporter);
+            PhysicalOperator.setPigLogger(pigHadoopLogger);
         }
         
         if(mp.isEmpty()){

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=747954&r1=747953&r2=747954&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Wed Feb 25 23:08:07 2009
@@ -46,6 +46,7 @@
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.TargetedTuple;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
@@ -159,6 +160,10 @@
         
         PhysicalOperator[] roots;
         private PhysicalOperator leaf;
+        
+        PigContext pigContext = null;
+        protected volatile boolean initialized = false;
+        
         /**
          * Configures the Reduce plan, the POPackage operator
          * and the reporter thread
@@ -189,6 +194,9 @@
                     roots = rp.getRoots().toArray(new PhysicalOperator[1]);
                     leaf = rp.getLeaves().get(0);
                 }
+                
+                pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
+                
             } catch (IOException ioe) {
                 String msg = "Problem while configuring reduce plan.";
                 throw new RuntimeException(msg, ioe);
@@ -204,13 +212,23 @@
         public void reduce(PigNullableWritable key,
                 Iterator<NullableTuple> tupIter,
                 OutputCollector<PigNullableWritable, Writable> oc,
-                Reporter reporter) throws IOException {
-            
-            // cache the collector for use in runPipeline()
-            // which could additionally be called from close()
-            this.outputCollector = oc;
-            pigReporter.setRep(reporter);
-            PhysicalOperator.setReporter(pigReporter);
+                Reporter reporter) throws IOException {            
+
+            if(!initialized) {
+                initialized  = true;
+                // cache the collector for use in runPipeline() which
+                // can be called from close()
+                this.outputCollector = oc;
+	            pigReporter.setRep(reporter);
+	            PhysicalOperator.setReporter(pigReporter);
+	            
+	            boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+	            
+	            PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+	            pigHadoopLogger.setAggregate(aggregateWarning);
+	            pigHadoopLogger.setReporter(reporter);
+	            PhysicalOperator.setPigLogger(pigHadoopLogger);
+            }
 
             // In the case we optimize the join, we combine
             // POPackage and POForeach - so we could get many
@@ -391,11 +409,21 @@
                 OutputCollector<PigNullableWritable, Writable> oc,
                 Reporter reporter) throws IOException {
             
-            // cache the collector for use in runPipeline()
-            // which could additionally be called from close()
-            this.outputCollector = oc;
-            pigReporter.setRep(reporter);
-            PhysicalOperator.setReporter(pigReporter);
+            if(!initialized) {
+                initialized  = true;
+	            // cache the collector for use in runPipeline()
+	            // which could additionally be called from close()
+	            this.outputCollector = oc;
+	            pigReporter.setRep(reporter);	            
+	            PhysicalOperator.setReporter(pigReporter);
+	            
+	            boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+
+	            PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+	            pigHadoopLogger.setAggregate(aggregateWarning);
+	            pigHadoopLogger.setReporter(reporter);
+	            PhysicalOperator.setPigLogger(pigHadoopLogger);
+            }
             
             // If the keyType is not a tuple, the MapWithComparator.collect()
             // would have wrapped the key into a tuple so that the 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=747954&r1=747953&r2=747954&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Wed Feb 25 23:08:07 2009
@@ -94,6 +94,11 @@
     // wrap their own version of a reporter.
     public static PigProgressable reporter;
 
+    // Will be used by operators to aggregate warning messages
+    // Should be set by the backends to appropriate implementations that
+    // wrap their own version of a logger.
+    protected static PigLogger pigLogger;
+
     // Dummy types used to access the getNext of appropriate
     // type. These will be null
     static protected DataByteArray dummyDBA;
@@ -310,5 +315,17 @@
     public void setParentPlan(PhysicalPlan physicalPlan) {
        parentPlan = physicalPlan;
     }
+    
+    public Log getLogger() {
+    	return log;
+    }
+    
+    public static void setPigLogger(PigLogger logger) {
+    	pigLogger = logger;
+    }
+    
+    public static PigLogger getPigLogger() {
+    	return pigLogger;
+    }
 
 }

Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java?rev=747954&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java Wed Feb 25 23:08:07 2009
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer;
+
+/**
+ * 
+ * An interface to allow aggregation of messages
+ */
+public interface PigLogger {
+    
+	/**
+	 * If you have warning messages that need aggregation 
+	 */
+    public void warn(Object o, String msg, Enum warningEnum);
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java?rev=747954&r1=747953&r2=747954&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java Wed Feb 25 23:08:07 2009
@@ -17,6 +17,7 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
 
+import org.apache.pig.PigWarning;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -70,8 +71,12 @@
         }
         right = (Double) res.result;
         
-        if (right == 0)
+        if (right == 0) {
+        	if(pigLogger != null) {
+        		pigLogger.warn(this, "Divide by zero. Converting it to NULL.", PigWarning.DIVIDE_BY_ZERO);
+        	}
             res.result = null;
+        }
         else
             res.result = new Double(left / right);
         return res;
@@ -96,8 +101,12 @@
         }
         right = (Float) res.result;
         
-        if (right == 0)
+        if (right == 0) {
+        	if(pigLogger != null) {
+        		pigLogger.warn(this, "Divide by zero. Converting it to NULL.", PigWarning.DIVIDE_BY_ZERO);
+        	}
             res.result = null;
+        }
         else
             res.result = new Float(left / right);
         return res;
@@ -122,8 +131,12 @@
         }
         right = (Integer) res.result;
         
-        if (right == 0)
-            res.result = null;
+        if (right == 0) {
+        	if(pigLogger != null) {
+        		pigLogger.warn(this, "Divide by zero. Converting it to NULL.", PigWarning.DIVIDE_BY_ZERO);
+        	}
+        	res.result = null;
+        }
         else
             res.result = new Integer(left / right);
         return res;
@@ -148,8 +161,12 @@
         }
         right = (Long) res.result;
         
-        if (right == 0)
+        if (right == 0) {
+        	if(pigLogger != null) {
+        		pigLogger.warn(this, "Divide by zero. Converting it to NULL.", PigWarning.DIVIDE_BY_ZERO);
+        	}
             res.result = null;
+        }
         else
             res.result = new Long(left / right);
         return res;

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=747954&r1=747953&r2=747954&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Wed Feb 25 23:08:07 2009
@@ -39,6 +39,8 @@
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.ProgressableReporter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -61,6 +63,7 @@
 	public static final byte INITIAL = 0;
 	public static final byte INTERMEDIATE = 1;
 	public static final byte FINAL = 2;
+	private boolean initialized = false;
 
 	public POUserFunc(OperatorKey k, int rp, List<PhysicalOperator> inp) {
 		super(k, rp);
@@ -92,7 +95,15 @@
 
 	private void instantiateFunc(FuncSpec fSpec) {
 		this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(fSpec);
+		//the next couple of initializations do not work as intended for the following reasons
+		//the reporter and pigLogger are member variables of PhysicalOperator
+		//when instanitateFunc is invoked at deserialization time, both
+		//reporter and pigLogger are null. They are set during map and reduce calls,
+		//making the initializations here basically useless. Look at the processInput
+		//method where these variables are re-initialized. At that point, the PhysicalOperator
+		//is set up correctly with the reporter and pigLogger references
         this.func.setReporter(reporter);
+        this.func.setPigLogger(pigLogger);
 	}
 	
 	public Result processInput() throws ExecException {
@@ -101,7 +112,11 @@
         // across in the serialization (don't know why).  I suspect it's as
         // cheap to call the setReporter call everytime as to check whether I
         // have (hopefully java will inline it).
-        func.setReporter(reporter);
+        if(!initialized) {
+        	func.setReporter(reporter);
+        	func.setPigLogger(pigLogger);
+        	initialized = true;
+        }
 
 		Result res = new Result();
 		Tuple inpValue = null;

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java?rev=747954&r1=747953&r2=747954&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java Wed Feb 25 23:08:07 2009
@@ -27,18 +27,16 @@
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobCreationException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.Launcher;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.PlanException;
-import org.apache.pig.impl.plan.PlanWalker;
 import org.apache.pig.impl.plan.VisitorException;
 
 public class LocalPigLauncher extends Launcher {
@@ -57,8 +55,11 @@
     public boolean launchPig(PhysicalPlan php, String grpName, PigContext pc)
             throws PlanException, VisitorException, IOException, ExecException,
             JobCreationException {
-        // TODO Auto-generated method stub
-        List<PhysicalOperator> stores = php.getLeaves();
+        //TODO
+    	//Until a PigLocalLogger is implemented, setting up a PigHadoopLogger
+    	PhysicalOperator.setPigLogger(PigHadoopLogger.getInstance());
+
+    	List<PhysicalOperator> stores = php.getLeaves();
         int noJobs = stores.size();
         int failedJobs = 0;
 

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java?rev=747954&r1=747953&r2=747954&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java Wed Feb 25 23:08:07 2009
@@ -34,7 +34,6 @@
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.util.WrappedIOException;
 
 
 /**
@@ -99,7 +98,7 @@
 
         @Override
         public Tuple exec(Tuple input) throws IOException {
-            try {
+        	try {
                 return tfact.newTuple(sumDoubles(input));
             } catch (ExecException ee) {
                 throw ee;

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java?rev=747954&r1=747953&r2=747954&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java Wed Feb 25 23:08:07 2009
@@ -17,25 +17,19 @@
  */
 package org.apache.pig.builtin;
 
-import java.awt.image.VolatileImage;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.io.ByteArrayInputStream;
 
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.Log;
-import org.apache.hadoop.io.DataOutputBuffer;
 
 import org.apache.pig.PigException;
+import org.apache.pig.PigWarning;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
 import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
@@ -57,6 +51,8 @@
     private Integer mMaxInt = new Integer(Integer.MAX_VALUE);
     private Long mMaxLong = new Long(Long.MAX_VALUE);
     private TextDataParser dataParser = null;
+    
+    private PigLogger pigLogger = PhysicalOperator.getPigLogger();
         
     public Utf8StorageConverter() {
     }
@@ -97,9 +93,9 @@
         try {
             return Double.valueOf(new String(b));
         } catch (NumberFormatException nfe) {
-            mLog.warn("Unable to interpret value " + b + " in field being " +
+            warn("Unable to interpret value " + b + " in field being " +
                     "converted to double, caught NumberFormatException <" +
-                    nfe.getMessage() + "> field discarded");
+                    nfe.getMessage() + "> field discarded", PigWarning.FIELD_DISCARDED);
             return null;
         }
     }
@@ -119,9 +115,9 @@
         try {
             return Float.valueOf(s);
         } catch (NumberFormatException nfe) {
-            mLog.warn("Unable to interpret value " + b + " in field being " +
+            warn("Unable to interpret value " + b + " in field being " +
                     "converted to float, caught NumberFormatException <" +
-                    nfe.getMessage() + "> field discarded");
+                    nfe.getMessage() + "> field discarded", PigWarning.FIELD_DISCARDED);
             return null;
         }
     }
@@ -141,14 +137,14 @@
                 Double d = Double.valueOf(s);
                 // Need to check for an overflow error
                 if (d.doubleValue() > mMaxInt.doubleValue() + 1.0) {
-                    mLog.warn("Value " + d + " too large for integer");
+                    warn("Value " + d + " too large for integer", PigWarning.TOO_LARGE_FOR_INT);
                     return null;
                 }
                 return new Integer(d.intValue());
             } catch (NumberFormatException nfe2) {
-                mLog.warn("Unable to interpret value " + b + " in field being " +
+                warn("Unable to interpret value " + b + " in field being " +
                         "converted to int, caught NumberFormatException <" +
-                        nfe.getMessage() + "> field discarded");
+                        nfe.getMessage() + "> field discarded", PigWarning.FIELD_DISCARDED);
                 return null;
             }
         }
@@ -178,14 +174,14 @@
                 Double d = Double.valueOf(s);
                 // Need to check for an overflow error
                 if (d.doubleValue() > mMaxLong.doubleValue() + 1.0) {
-                    mLog.warn("Value " + d + " too large for integer");
+                	warn("Value " + d + " too large for integer", PigWarning.TOO_LARGE_FOR_INT);
                     return null;
                 }
                 return new Long(d.longValue());
             } catch (NumberFormatException nfe2) {
-                mLog.warn("Unable to interpret value " + b + " in field being " +
+                warn("Unable to interpret value " + b + " in field being " +
                         "converted to long, caught NumberFormatException <" +
-                        nfe.getMessage() + "> field discarded");
+                        nfe.getMessage() + "> field discarded", PigWarning.FIELD_DISCARDED);
                 return null;
             }
         }
@@ -252,5 +248,13 @@
         return t.toString().getBytes();
     }
     
+    protected void warn(String msg, Enum warningEnum) {
+    	pigLogger = PhysicalOperator.getPigLogger();
+    	if(pigLogger != null) {
+    		pigLogger.warn(this, msg, warningEnum);
+    	} else {
+    		mLog.warn(msg);
+    	}    	
+    }
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java?rev=747954&r1=747953&r2=747954&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java Wed Feb 25 23:08:07 2009
@@ -31,6 +31,7 @@
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
 import org.apache.pig.impl.util.Spillable;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -42,6 +43,8 @@
 public abstract class DefaultAbstractBag implements DataBag {
 
      private static final Log log = LogFactory.getLog(DataBag.class);
+     
+     private static PigLogger pigLogger = PhysicalOperator.getPigLogger();
 
     // Container that holds the tuples. Actual object instantiated by
     // subclasses.
@@ -351,6 +354,15 @@
         }
     }
 
+    protected void warn(String msg, Enum warningEnum, Exception e) {
+    	pigLogger = PhysicalOperator.getPigLogger();
+    	if(pigLogger != null) {
+    		pigLogger.warn(this, msg, warningEnum);
+    	} else {
+    		log.warn(msg, e);
+    	}    	
+    }
+
     public static abstract class BagDelimiterTuple extends DefaultTuple{}
     public static class StartBag extends BagDelimiterTuple{}
     

Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java?rev=747954&r1=747953&r2=747954&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java Wed Feb 25 23:08:07 2009
@@ -30,6 +30,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigWarning;
 
 
 /**
@@ -47,7 +48,7 @@
     private static TupleFactory gTupleFactory = TupleFactory.getInstance();
 
     private static final Log log = LogFactory.getLog(DefaultDataBag.class);
- 
+    
     public DefaultDataBag() {
         mContents = new ArrayList<Tuple>();
     }
@@ -90,8 +91,8 @@
             }  catch (IOException ioe) {
                 // Do not remove last file from spilled array. It was not
                 // added as File.createTmpFile threw an IOException
-                log.warn(
-                    "Unable to create tmp file to spill to disk", ioe);
+                warn(
+                    "Unable to create tmp file to spill to disk", PigWarning.UNABLE_TO_CREATE_FILE_TO_SPILL, ioe);
                 return 0;
             }
             try {
@@ -107,15 +108,15 @@
                 // Remove the last file from the spilled array, since we failed to
                 // write to it.
                 mSpillFiles.remove(mSpillFiles.size() - 1);
-                log.warn(
-                    "Unable to spill contents to disk", ioe);
+                warn(
+                    "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
                 return 0;
             } finally {
                 if (out != null) {
                     try {
                         out.close();
                     } catch (IOException e) {
-                        log.warn("Error closing spill", e);
+                        warn("Error closing spill", PigWarning.UNABLE_TO_CLOSE_SPILL_FILE, e);
                     }
                 }
             }

Modified: hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java?rev=747954&r1=747953&r2=747954&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java Wed Feb 25 23:08:07 2009
@@ -36,6 +36,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigWarning;
 
 
 
@@ -135,8 +136,8 @@
             }  catch (IOException ioe) {
                 // Do not remove last file from spilled array. It was not
                 // added as File.createTmpFile threw an IOException
-                log.warn(
-                    "Unable to create tmp file to spill to disk", ioe);
+                warn(
+                    "Unable to create tmp file to spill to disk", PigWarning.UNABLE_TO_CREATE_FILE_TO_SPILL, ioe);
                 return 0;
             }
             try {
@@ -167,15 +168,15 @@
                 // Remove the last file from the spilled array, since we failed to
                 // write to it.
                 mSpillFiles.remove(mSpillFiles.size() - 1);
-                log.warn(
-                    "Unable to spill contents to disk", ioe);
+                warn(
+                    "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
                 return 0;
             } finally {
                 if (out != null) {
                     try {
                         out.close();
                     } catch (IOException e) {
-                        log.warn("Error closing spill", e);
+                        warn("Error closing spill", PigWarning.UNABLE_TO_CLOSE_SPILL_FILE, e);
                     }
                 }
             }

Modified: hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java?rev=747954&r1=747953&r2=747954&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java Wed Feb 25 23:08:07 2009
@@ -35,6 +35,7 @@
   
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigWarning;
 
 
 /**
@@ -108,8 +109,8 @@
             } catch (IOException ioe) {
                 // Do not remove last file from spilled array. It was not
                 // added as File.createTmpFile threw an IOException
-                log.warn(
-                    "Unable to create tmp file to spill to disk", ioe);
+                warn(
+                    "Unable to create tmp file to spill to disk", PigWarning.UNABLE_TO_CREATE_FILE_TO_SPILL, ioe);
                 return 0;
             }
             try {
@@ -135,15 +136,15 @@
                 // Remove the last file from the spilled array, since we failed to
                 // write to it.
                 mSpillFiles.remove(mSpillFiles.size() - 1);
-                log.warn(
-                    "Unable to spill contents to disk", ioe);
+                warn(
+                    "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
                 return 0;
             } finally {
                 if (out != null) {
                     try {
                         out.close();
                     } catch (IOException e) {
-                        log.warn("Error closing spill", e);
+                        warn("Error closing spill", PigWarning.UNABLE_TO_CLOSE_SPILL_FILE, e);
                     }
                 }
             }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=747954&r1=747953&r2=747954&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java Wed Feb 25 23:08:07 2009
@@ -37,6 +37,7 @@
 import org.apache.pig.LoadFunc;
 import org.apache.pig.Algebraic;
 import org.apache.pig.PigException;
+import org.apache.pig.PigWarning;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.ExpressionOperator;
 import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -215,113 +216,6 @@
             msgCollector.collect(msg, MessageType.Error);
             throw new TypeCheckerException(msg, errCode, PigException.INPUT, fe) ;
         }
-
-        /*
-        if (!pj.getSentinel()) {
-
-            LogicalOperator op = pj.getExpression() ;
-
-            if (!(op instanceof LOProject)) {
-                throw new AssertionError("LOProject.getExpression() has to be "
-                                         + "LOProject if it's not a sentinel") ;
-            }
-
-            // else
-            LOProject innerProject = (LOProject) op ;
-            resolveLOProjectType(innerProject) ;
-
-            if ( (innerProject.getType() != DataType.BAG) &&
-                 (innerProject.getType() != DataType.TUPLE) ) {
-                throw new AssertionError("Nested LOProject is for extracting "
-                                         + " from TUPLE/BAG only") ;
-            }
-
-            // set type of this project
-            pj.setType(innerProject.getType());
-            Schema inputSchema = null ;
-
-            try {
-                inputSchema = innerProject.getSchema() ;
-            }
-            catch (FrontendException fe) {
-                String msg = "Cannot get source schema into LOProject" ;
-                msgCollector.collect(msg, MessageType.Error);
-                VisitorException vse = new VisitorException(msg) ;
-                vse.initCause(fe) ;
-                throw new VisitorException(msg) ;
-            }
-
-            // extracting schema from projection
-            List<FieldSchema> fsList = new ArrayList<FieldSchema>() ;
-            try {
-                for(int index: pj.getProjection()) {
-                    FieldSchema fs = null ;
-                    // typed input
-                    if (inputSchema != null) {
-                        fs = inputSchema.getField(index) ;
-                        FieldSchema newFs = new FieldSchema(fs.alias, fs.schema, fs.type) ;
-                        fsList.add(newFs) ;
-                    }
-                    // non-typed input
-                    else {
-                        FieldSchema newFs = new FieldSchema(null, DataType.BYTEARRAY) ;
-                        fsList.add(newFs) ;
-                    }
-                }
-                pj.setFieldSchema(new FieldSchema(null, new Schema(fsList), innerProject.getType()));
-            }
-            catch (FrontendException fe) {
-                String msg = "Cannot get source schema into LOProject" ;
-                msgCollector.collect(msg, MessageType.Error);
-                VisitorException vse = new VisitorException(msg) ;
-                vse.initCause(fe) ;
-                throw new VisitorException(msg) ;
-            }
-            catch (ParseException pe) {
-                String msg = "Cannot get source schema into LOProject" ;
-                msgCollector.collect(msg, MessageType.Error);
-                VisitorException vse = new VisitorException(msg) ;
-                vse.initCause(pe) ;
-                throw new VisitorException(msg) ;
-            }
-        }
-        // if it's a sentinel, we just get the projected input type to it
-        else {
-            if (pj.getProjection().size() != 1) {
-                throw new AssertionError("Sentinel LOProject can have only "
-                                         + "1 projection") ;
-            }
-            LogicalOperator input = pj.getExpression() ;
-            int projectedField = pj.getProjection().get(0) ;
-            try {
-                Schema schema = input.getSchema() ;
-
-                if (schema != null) {
-                    FieldSchema fs = schema.getField(projectedField) ;
-                    pj.setFieldSchema(fs);
-                }
-                else {
-                    FieldSchema fs = new FieldSchema(null, DataType.BYTEARRAY) ;
-                    pj.setFieldSchema(fs);
-                }
-            }
-            catch (FrontendException fe) {
-                String msg = "Cannot get source schema into LOProject" ;
-                msgCollector.collect(msg, MessageType.Error);
-                VisitorException vse = new VisitorException(msg) ;
-                vse.initCause(fe) ;
-                throw new VisitorException(msg) ;
-            }
-            catch (ParseException pe) {
-                String msg = "Cannot get source schema into LOProject" ;
-                msgCollector.collect(msg, MessageType.Error);
-                VisitorException vse = new VisitorException(msg) ;
-                vse.initCause(pe) ;
-                throw new VisitorException(msg) ;
-            }
-        }
-        */
-        
     }
 
     /**
@@ -1391,7 +1285,7 @@
                              " will be called with following argument types: " +
                              matchingSpec.getInputArgsSchema() + ". If you want to use " +
                              "different input argument types, please use explicit casts.";
-                msgCollector.collect(msg, MessageType.Warning);
+                msgCollector.collect(msg, MessageType.Warning, PigWarning.USING_OVERLOADED_FUNCTION);
             }
             func.setFuncSpec(matchingSpec);
             insertCastsForUDF(func, s, matchingSpec.getInputArgsSchema());
@@ -2723,7 +2617,7 @@
                 // We just warn about mismatch type in non-strict mode
                 if (!strictMode) {
                     String msg = "COGroup by incompatible types results in ByteArray" ;
-                    msgCollector.collect(msg, MessageType.Warning) ;
+                    msgCollector.collect(msg, MessageType.Warning, PigWarning.GROUP_BY_INCOMPATIBLE_TYPES) ;
                     groupType = DataType.BYTEARRAY ;
                 }
                 // We just die if in strict mode
@@ -2780,7 +2674,7 @@
                     // We just warn about mismatch type in non-strict mode
                     if (!strictMode) {
                         String msg = "COGroup by incompatible types results in ByteArray" ;
-                        msgCollector.collect(msg, MessageType.Warning) ;
+                        msgCollector.collect(msg, MessageType.Warning, PigWarning.GROUP_BY_INCOMPATIBLE_TYPES) ;
                         fsList.get(j).type = DataType.BYTEARRAY ;
                     }
                     // We just die if in strict mode
@@ -3138,9 +3032,36 @@
         String originalTypeName = DataType.findTypeName(originalType) ;
         String toTypeName = DataType.findTypeName(toType) ;
         String opName = op.getClass().getSimpleName() ;
+        Enum kind = null;
+        switch(toType) {
+        case DataType.BAG:
+        	kind = PigWarning.IMPLICIT_CAST_TO_BAG;
+        	break;
+        case DataType.CHARARRAY:
+        	kind = PigWarning.IMPLICIT_CAST_TO_CHARARRAY;
+        	break;
+        case DataType.DOUBLE:
+        	kind = PigWarning.IMPLICIT_CAST_TO_DOUBLE;
+        	break;
+        case DataType.FLOAT:
+        	kind = PigWarning.IMPLICIT_CAST_TO_FLOAT;
+        	break;
+        case DataType.INTEGER:
+        	kind = PigWarning.IMPLICIT_CAST_TO_INT;
+        	break;
+        case DataType.LONG:
+        	kind = PigWarning.IMPLICIT_CAST_TO_LONG;
+        	break;
+        case DataType.MAP:
+        	kind = PigWarning.IMPLICIT_CAST_TO_MAP;
+        	break;
+        case DataType.TUPLE:
+        	kind = PigWarning.IMPLICIT_CAST_TO_TUPLE;
+        	break;
+        }
         msgCollector.collect(originalTypeName + " is implicitly cast to "
                              + toTypeName +" under " + opName + " Operator",
-                             MessageType.Warning) ;
+                             MessageType.Warning, kind) ;
     }
 
     /***

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/plan/CompilationMessageCollector.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/CompilationMessageCollector.java?rev=747954&r1=747953&r2=747954&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/CompilationMessageCollector.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/CompilationMessageCollector.java Wed Feb 25 23:08:07 2009
@@ -17,9 +17,13 @@
  */
 package org.apache.pig.impl.plan ;
 
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.ArrayList ;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
 /***
  * This class is used for collecting all messages (error + warning) in 
  * compilation process. These messages are reported back to users 
@@ -40,12 +44,18 @@
     public static class Message {
         private String msg = null ;
         private MessageType msgType = MessageType.Unknown ;
+        private Enum kind = null;
         
         public Message(String message, MessageType messageType) {
             msg = message ;
             msgType = messageType ;
         }
         
+        public Message(String message, MessageType messageType, Enum kind) {
+        	this(message, messageType);
+        	this.kind = kind;
+        }
+        
         public String getMessage() {
             return msg ;
         }
@@ -53,6 +63,10 @@
         public MessageType getMessageType() {
             return msgType ;
         }
+        
+        public Enum getKind() {
+        	return kind;
+        }
     }
     
     private List<Message> messageList = new ArrayList<Message>() ;
@@ -65,15 +79,23 @@
         messageList.add(new Message(message, messageType)) ;
     }
     
-    public boolean hasError() {
+    public void collect(String message, MessageType messageType, Enum kind) {
+        messageList.add(new Message(message, messageType, kind)) ;
+    }
+    
+    protected boolean hasMessageType(MessageType messageType) {
         Iterator<Message> iter = iterator() ;
         while(iter.hasNext()) {
-            if (iter.next().getMessageType() == MessageType.Error) {
+            if (iter.next().getMessageType() == messageType) {
                 return true ;
             }
         }
         return false ;
     }
+    
+    public boolean hasError() {
+    	return hasMessageType(MessageType.Error);
+    }
 
     public Iterator<Message> iterator() {
         return messageList.iterator() ;
@@ -91,4 +113,76 @@
         return messageList.get(i) ;
     }
     
+    public Map<Enum, Long> getKindAggregate(MessageType messageType) {
+    	Map<Enum, Long> aggMap = new HashMap<Enum, Long>();
+        Iterator<Message> iter = iterator() ;
+        while(iter.hasNext()) {
+        	Message message = iter.next(); 
+            if (message.getMessageType() == messageType) {
+            	Enum kind = message.getKind();
+            	if(kind != null) {
+            		Long count = aggMap.get(kind);
+            		count = (count == null? 1 : ++count);
+            		aggMap.put(kind, count);
+            	}
+            }
+    	}
+    	return aggMap;
+    }
+    
+    public static void logAggregate(Map<Enum, Long> aggMap, MessageType messageType, Log log) {
+    	for(Enum e: aggMap.keySet()) {
+    		Long count = aggMap.get(e);
+    		if(count != null && count > 0) {
+    			String message = "Encountered " + messageType + " " + e.toString() + " " + count + " time(s).";
+    			logMessage(message, messageType, log);
+    		}
+    	}	
+    }
+    
+    public static void logMessages(CompilationMessageCollector messageCollector, 
+    		MessageType messageType, boolean aggregate, Log log) {
+    	if(aggregate) {
+    		Map<Enum, Long> aggMap = messageCollector.getKindAggregate(messageType);
+    		logAggregate(aggMap, messageType, log);
+    	} else {
+    		Iterator<Message> messageIter = messageCollector.iterator();
+    		while(messageIter.hasNext()) {
+    			Message message = messageIter.next();
+    			if(message.getMessageType() == messageType) {
+    				logMessage(message.getMessage(), messageType, log);
+    			}
+    		}
+    	}
+    }
+    
+    public void logMessages(MessageType messageType, boolean aggregate, Log log) {
+    	logMessages(this, messageType, aggregate, log);
+    }
+    
+    public static void logAllMessages(CompilationMessageCollector messageCollector, Log log) {
+    	Iterator<Message> messageIter = messageCollector.iterator();
+		while(messageIter.hasNext()) {
+			Message message = messageIter.next();
+			logMessage(message.getMessage(), message.getMessageType(), log);
+		}
+    }
+    
+    public void logAllMessages(Log log) {
+    	logAllMessages(this, log);
+    }
+
+    private static void logMessage(String messageString, MessageType messageType, Log log) {
+		switch(messageType) {
+		case Info:
+			log.info(messageString);
+			break;
+		case Warning:
+			log.warn(messageString);
+			break;
+		case Error:
+			log.error(messageString);
+		}
+    }
+    
 }



Mime
View raw message