pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject svn commit: r1604515 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
Date Sun, 22 Jun 2014 03:09:56 GMT
Author: daijy
Date: Sun Jun 22 03:09:55 2014
New Revision: 1604515

URL: http://svn.apache.org/r1604515
Log:
PIG-4031: Provide Counter aggregation for Tez

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1604515&r1=1604514&r2=1604515&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Jun 22 03:09:55 2014
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
  
 IMPROVEMENTS
 
+PIG-4031: Provide Counter aggregation for Tez (daijy)
+
 PIG-4028: add a flag to control the ivy resolve/retrieve output (gkesavan via daijy)
 
 PIG-4015: Provide a way to disable auto-parallism in tez (daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1604515&r1=1604514&r2=1604515&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Sun Jun
22 03:09:55 2014
@@ -20,7 +20,9 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.IOException;
 import java.io.PrintStream;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -28,6 +30,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
 import org.apache.pig.PigConfiguration;
+import org.apache.pig.PigWarning;
 import org.apache.pig.backend.BackendException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -39,6 +42,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
+import org.apache.pig.impl.plan.CompilationMessageCollector;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.UDFContext;
@@ -159,6 +163,19 @@ public class TezLauncher extends Launche
 
             notifyFinishedOrFailed(job);
             tezStats.accumulateStats(job);
+            Map<Enum, Long> warningAggMap = new HashMap<Enum, Long>();
+
+            if (aggregateWarning && job.getJobState() == ControlledJob.State.SUCCESS)
{
+                for (Vertex vertex : job.getDAG().getVertices()) {
+                    String vertexName = vertex.getVertexName();
+                    Map<String, Map<String, Long>> counterGroups = job.getVertexCounters(vertexName);
+                    computeWarningAggregate(counterGroups, warningAggMap);
+                }
+            }
+
+            if(aggregateWarning) {
+                CompilationMessageCollector.logAggregate(warningAggMap, MessageType.Warning,
log) ;
+            }
             tezScriptState.emitProgressUpdatedNotification(100);
         }
 
@@ -188,6 +205,25 @@ public class TezLauncher extends Launche
         return tezStats;
     }
 
+    void computeWarningAggregate(Map<String, Map<String, Long>> counterGroups,
Map<Enum, Long> aggMap) {
+        for (Map<String, Long> counters : counterGroups.values()) {
+            for (Enum e : PigWarning.values()) {
+                if (counters.containsKey(e.toString())) {
+                    if (aggMap.containsKey(e.toString())) {
+                        Long currentCount = aggMap.get(e.toString());
+                        currentCount = (currentCount == null ? 0 : currentCount);
+                        if (counters != null) {
+                            currentCount += counters.get(e.toString());
+                        }
+                        aggMap.put(e, currentCount);
+                    } else {
+                        aggMap.put(e, counters.get(e.toString()));
+                    }
+                }
+            }
+        }
+    }
+
     private void notifyStarted(TezJob job) throws IOException {
         for (Vertex v : job.getDAG().getVertices()) {
             TezTaskStats tts = tezStats.getVertexStats(v.getVertexName());



Mime
View raw message