pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cheol...@apache.org
Subject svn commit: r1575965 - in /pig/branches/tez: src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/tools/pigstats/ src/org/apache/pig/tools/pigstats/mapreduce/ src/org/apache/pig/tools/pigstats/tez/ test/org/apac...
Date Mon, 10 Mar 2014 15:21:51 GMT
Author: cheolsoo
Date: Mon Mar 10 15:21:50 2014
New Revision: 1575965

URL: http://svn.apache.org/r1575965
Log:
PIG-3603: Add counters to TezStats (cheolsoo)

Modified:
    pig/branches/tez/src/org/apache/pig/PigServer.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
    pig/branches/tez/src/org/apache/pig/tools/pigstats/InputStats.java
    pig/branches/tez/src/org/apache/pig/tools/pigstats/JobStats.java
    pig/branches/tez/src/org/apache/pig/tools/pigstats/OutputStats.java
    pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStats.java
    pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
    pig/branches/tez/src/org/apache/pig/tools/pigstats/ScriptState.java
    pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
    pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java
    pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java
    pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java
    pig/branches/tez/test/org/apache/pig/test/TestCombiner.java
    pig/branches/tez/test/org/apache/pig/test/TestPigServer.java

Modified: pig/branches/tez/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/PigServer.java?rev=1575965&r1=1575964&r2=1575965&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/PigServer.java (original)
+++ pig/branches/tez/src/org/apache/pig/PigServer.java Mon Mar 10 15:21:50 2014
@@ -245,7 +245,11 @@ public class PigServer {
         markPredeployedJarsFromProperties();
 
         PigStats.start(pigContext.getExecutionEngine().instantiatePigStats());
-        ScriptState.start(pigContext.getExecutionEngine().instantiateScriptState());
+        // ScriptState may have been initialized in Main. In that case, we
+        // should not overwrite it.
+        if (ScriptState.get() == null) {
+            ScriptState.start(pigContext.getExecutionEngine().instantiateScriptState());
+        }
     }
 
     private void addJarsFromProperties() throws ExecException {
@@ -277,7 +281,7 @@ public class PigServer {
     private void markPredeployedJarsFromProperties() throws ExecException {
         // mark jars as predeployed from properties
         String jar_str = pigContext.getProperties().getProperty("pig.predeployed.jars");
-		
+
         if(jar_str != null){
             // Use File.pathSeparator (":" on Linux, ";" on Windows)
             // to correctly handle path aggregates as they are represented

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1575965&r1=1575964&r2=1575965&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Mon
Mar 10 15:21:50 2014
@@ -18,6 +18,8 @@
 package org.apache.pig.backend.hadoop.executionengine.tez;
 
 import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Iterator;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -27,11 +29,18 @@ import org.apache.hadoop.mapreduce.lib.j
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.pig.PigConfiguration;
 import org.apache.tez.client.TezSession;
-import org.apache.tez.dag.api.DAG;
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.api.client.VertexStatus;
+
+import com.google.common.collect.Maps;
 
 /**
  * Wrapper class that encapsulates Tez DAG. This class mediates between Tez DAGs
@@ -39,6 +48,7 @@ import org.apache.tez.dag.api.client.DAG
  */
 public class TezJob extends ControlledJob {
     private static final Log log = LogFactory.getLog(TezJob.class);
+    private EnumSet<StatusGetOpts> statusGetOpts;
     private DAGStatus dagStatus;
     private Configuration conf;
     private TezDAG dag;
@@ -46,6 +56,8 @@ public class TezJob extends ControlledJo
     private Map<String, LocalResource> requestAMResources;
     private TezSession tezSession;
     private boolean reuseSession;
+    private TezCounters dagCounters;
+    private Map<String, Map<String, Long>> vertexCounters;
 
     public TezJob(TezConfiguration conf, TezDAG dag, Map<String, LocalResource> requestAMResources)
             throws IOException {
@@ -54,9 +66,11 @@ public class TezJob extends ControlledJo
         this.dag = dag;
         this.requestAMResources = requestAMResources;
         this.reuseSession = conf.getBoolean(PigConfiguration.TEZ_SESSION_REUSE, true);
+        this.statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
+        this.vertexCounters = Maps.newHashMap();
     }
 
-    public DAG getDag() {
+    public TezDAG getDag() {
         return dag;
     }
 
@@ -64,6 +78,14 @@ public class TezJob extends ControlledJo
         return dagStatus;
     }
 
+    public TezCounters getDagCounters() {
+        return dagCounters;
+    }
+
+    public Map<String, Long> getVertexCounters(String name) {
+        return vertexCounters.get(name);
+    }
+
     @Override
     public void submit() {
         try {
@@ -82,7 +104,7 @@ public class TezJob extends ControlledJo
 
         while (true) {
             try {
-                dagStatus = dagClient.getDAGStatus(null);
+                dagStatus = dagClient.getDAGStatus(statusGetOpts);
             } catch (Exception e) {
                 log.info("Cannot retrieve DAG status", e);
                 setJobState(ControlledJob.State.FAILED);
@@ -97,6 +119,8 @@ public class TezJob extends ControlledJo
                     sb.append(msg);
                     sb.append("\n");
                 }
+                dagCounters = dagStatus.getDAGCounters();
+                collectVertexCounters();
                 setMessage(sb.toString());
                 TezSessionManager.freeSession(tezSession);
                 try {
@@ -120,6 +144,30 @@ public class TezJob extends ControlledJo
         }
     }
 
+    private void collectVertexCounters() {
+        for (Vertex v : dag.getVertices()) {
+            String name = v.getVertexName();
+            try {
+                VertexStatus s = dagClient.getVertexStatus(name, statusGetOpts);
+                Map<String, Long> cntMap = Maps.newHashMap();
+                TezCounters counters = s.getVertexCounters();
+                Iterator<CounterGroup> grpIt = counters.iterator();
+                while (grpIt.hasNext()) {
+                    Iterator<TezCounter> cntIt = grpIt.next().iterator();
+                    while (cntIt.hasNext()) {
+                        TezCounter cnt = cntIt.next();
+                        cntMap.put(cnt.getName(), cnt.getValue());
+                    }
+                }
+                vertexCounters.put(name, cntMap);
+            } catch (Exception e) {
+                // Don't fail the job even if vertex counters couldn't
+                // be retrieved.
+                log.info("Cannot retrieve counters for vertex " + name, e);
+            }
+        }
+    }
+
     @Override
     public void killJob() throws IOException {
         try {

Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/InputStats.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/InputStats.java?rev=1575965&r1=1575964&r2=1575965&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/InputStats.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/InputStats.java Mon Mar 10 15:21:50
2014
@@ -36,22 +36,22 @@ public final class InputStats {
     private long records;
 
     private boolean success;
-    
+
     public static enum INPUT_TYPE { regular, sampler, indexer, side };
- 
+
     private INPUT_TYPE type = INPUT_TYPE.regular;
-    
+
     private Configuration conf;
 
     public InputStats(String location, long bytes, long records, boolean success) {
         this.location = location;
         this.bytes = bytes;
-        this.records = records;        
+        this.records = records;
         this.success = success;
         try {
             this.name = new Path(location).getName();
         } catch (Exception e) {
-            // location is a mal formatted URL 
+            // location is a mal formatted URL
             this.name = location;
         }
     }
@@ -79,14 +79,14 @@ public final class InputStats {
     public Configuration getConf() {
         return conf;
     }
-    
+
     public INPUT_TYPE getInputType() {
         return type;
     }
-    
+
     public String getDisplayString(boolean local) {
         StringBuilder sb = new StringBuilder();
-        if (success) {            
+        if (success) {
             sb.append("Successfully ");
             if (type == INPUT_TYPE.sampler) {
                 sb.append("sampled ");
@@ -95,7 +95,7 @@ public final class InputStats {
             } else {
                 sb.append("read ");
             }
-            
+
             if (!local && records >= 0) {
                 sb.append(records).append(" records ");
             } else {
@@ -115,19 +115,19 @@ public final class InputStats {
         }
         return sb.toString();
     }
-    
+
     public void setConf(Configuration conf) {
         this.conf = conf;
     }
-    
+
     public void markSampleInput() {
         type = INPUT_TYPE.sampler;
     }
-    
+
     public void markIndexerInput() {
         type = INPUT_TYPE.indexer;
     }
-    
+
     public void markSideFileInput() {
         type = INPUT_TYPE.side;
     }

Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/JobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/JobStats.java?rev=1575965&r1=1575964&r2=1575965&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/JobStats.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/JobStats.java Mon Mar 10 15:21:50 2014
@@ -24,8 +24,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.pig.classification.InterfaceAudience;
@@ -47,7 +45,6 @@ public abstract class JobStats extends O
     public static final String ALIAS_LOCATION = "JobStatistics:alias_location";
     public static final String FEATURE = "JobStatistics:feature";
 
-    private static final Log LOG = LogFactory.getLog(JobStats.class);
     public static final String SUCCESS_HEADER = null;
     public static final String FAILURE_HEADER = null;
 
@@ -61,6 +58,9 @@ public abstract class JobStats extends O
 
     protected Configuration conf;
 
+    protected long hdfsBytesRead = 0;
+    protected long hdfsBytesWritten = 0;
+
     private String errorMsg;
 
     private Exception exception = null;
@@ -112,6 +112,14 @@ public abstract class JobStats extends O
         return (String)getAnnotation(FEATURE);
     }
 
+    public long getHdfsBytesRead() {
+        return hdfsBytesRead;
+    }
+
+    public long getHdfsBytesWritten() {
+        return hdfsBytesWritten;
+    }
+
     /**
      * Returns the total bytes written to user specified HDFS
      * locations of this job.
@@ -241,7 +249,7 @@ public abstract class JobStats extends O
      * then use {@link org.apache.pig.tools.pigstats.mapreduce.MRJobStats#getAvgREduceTime}
instead.
      */
     @Deprecated
-    abstract public long getAvgREduceTime();
+    abstract public long getAvgReduceTime();
 
     /**
      * @deprecated If you are using mapreduce, please cast JobStats to org.apache.pig.tools.pigstats.mapreduce.MRJobStats,
@@ -259,17 +267,17 @@ public abstract class JobStats extends O
 
     /**
      * @deprecated If you are using mapreduce, please cast JobStats to org.apache.pig.tools.pigstats.mapreduce.MRJobStats,
-     * then use {@link org.apache.pig.tools.pigstats.mapreduce.MRJobStats#getReduceOutputRecords}
instead.
+     * then use {@link org.apache.pig.tools.pigstats.mapreduce.MRJobStats#getReduceInputRecords}
instead.
      */
     @Deprecated
-    abstract public long getReduceOutputRecords();
+    abstract public long getReduceInputRecords();
 
     /**
      * @deprecated If you are using mapreduce, please cast JobStats to org.apache.pig.tools.pigstats.mapreduce.MRJobStats,
-     * then use {@link org.apache.pig.tools.pigstats.mapreduce.MRJobStats#getReduceInputRecords}
instead.
+     * then use {@link org.apache.pig.tools.pigstats.mapreduce.MRJobStats#getReduceOutputRecords}
instead.
      */
     @Deprecated
-    abstract public long getReduceInputRecords();
+    abstract public long getReduceOutputRecords();
 
     /**
      * @deprecated If you are using mapreduce, please cast JobStats to org.apache.pig.tools.pigstats.mapreduce.MRJobStats,
@@ -294,23 +302,23 @@ public abstract class JobStats extends O
 
     /**
      * @deprecated If you are using mapreduce, please cast JobStats to org.apache.pig.tools.pigstats.mapreduce.MRJobStats,
-     * then use {@link org.apache.pig.tools.pigstats.mapreduce.MRJobStats#getHdfsBytesWritten}
instead.
+     * then use {@link org.apache.pig.tools.pigstats.mapreduce.MRJobStats#getHadoopCounters}
instead.
      */
     @Deprecated
-    abstract public long getHdfsBytesWritten();
+    abstract public Counters getHadoopCounters();
 
     /**
      * @deprecated If you are using mapreduce, please cast JobStats to org.apache.pig.tools.pigstats.mapreduce.MRJobStats,
-     * then use {@link org.apache.pig.tools.pigstats.mapreduce.MRJobStats#getHadoopCounters}
instead.
+     * then use {@link org.apache.pig.tools.pigstats.mapreduce.MRJobStats#getMultiStoreCounters}
instead.
      */
     @Deprecated
-    abstract public Counters getHadoopCounters();
+    abstract public Map<String, Long> getMultiStoreCounters();
 
     /**
      * @deprecated If you are using mapreduce, please cast JobStats to org.apache.pig.tools.pigstats.mapreduce.MRJobStats,
-     * then use {@link org.apache.pig.tools.pigstats.mapreduce.MRJobStats#getMultiStoreCounters}
instead.
+     * then use {@link org.apache.pig.tools.pigstats.mapreduce.MRJobStats#getMultiInputCounters}
instead.
      */
     @Deprecated
-    abstract public Map<String, Long> getMultiStoreCounters();
+    abstract public Map<String, Long> getMultiInputCounters();
 
 }

Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/OutputStats.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/OutputStats.java?rev=1575965&r1=1575964&r2=1575965&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/OutputStats.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/OutputStats.java Mon Mar 10 15:21:50
2014
@@ -35,7 +35,6 @@ import org.apache.pig.classification.Int
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.ReadToEndLoader;
-import org.apache.pig.newplan.Operator;
 
 /**
  * This class encapsulates the runtime statistics of an user specified output.

Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStats.java?rev=1575965&r1=1575964&r2=1575965&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStats.java Mon Mar 10 15:21:50 2014
@@ -268,6 +268,10 @@ public abstract class PigStats {
         return ScriptState.get().getId();
     }
 
+    public String getFileName() {
+        return ScriptState.get().getFileName();
+    }
+
     public String getFeatures() {
         return ScriptState.get().getScriptFeatures();
     }

Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=1575965&r1=1575964&r2=1575965&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Mon Mar 10 15:21:50
2014
@@ -32,89 +32,60 @@ import org.apache.pig.tools.pigstats.map
  */
 public class PigStatsUtil {
 
-    /**
-     * @deprecated use {@link org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil#MULTI_STORE_RECORD_COUNTER}
instead.
-     */
-    @Deprecated
-    public static final String MULTI_STORE_RECORD_COUNTER
-            = "Output records in ";
-
-    /**
-     * @deprecated use {@link org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil#MULTI_STORE_COUNTER_GROUP}
instead.
-     */
-    @Deprecated
-    public static final String MULTI_STORE_COUNTER_GROUP
-            = "MultiStoreCounters";
-
-    /**
-     * @deprecated use {@link org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil#TASK_COUNTER_GROUP}
instead.
-     */
-    @Deprecated
-    public static final String TASK_COUNTER_GROUP
-            = "org.apache.hadoop.mapred.Task$Counter";
-
-    /**
-     * @deprecated use {@link org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil#FS_COUNTER_GROUP}
instead.
-     */
-    @Deprecated
-    public static final String FS_COUNTER_GROUP
-            = HadoopShims.getFsCounterGroupName();
-
-    /**
-     * @deprecated use {@link org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil#MAP_INPUT_RECORDS}
instead.
-     */
-    @Deprecated
     public static final String MAP_INPUT_RECORDS
             = "MAP_INPUT_RECORDS";
-
-    /**
-     * @deprecated use {@link org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil#MAP_OUTPUT_RECORDS}
instead.
-     */
-    @Deprecated
     public static final String MAP_OUTPUT_RECORDS
             = "MAP_OUTPUT_RECORDS";
+    public static final String REDUCE_INPUT_RECORDS
+            = "REDUCE_INPUT_RECORDS";
+    public static final String REDUCE_OUTPUT_RECORDS
+            = "REDUCE_OUTPUT_RECORDS";
+    public static final String HDFS_BYTES_WRITTEN
+            = "HDFS_BYTES_WRITTEN";
+    public static final String HDFS_BYTES_READ
+            = "HDFS_BYTES_READ";
 
     /**
-     * @deprecated use {@link org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil#REDUCE_INPUT_RECORDS}
instead.
+     * @deprecated use {@link org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil#MULTI_INPUT_RECORD_COUNTER}
instead.
      */
     @Deprecated
-    public static final String REDUCE_INPUT_RECORDS
-            = "REDUCE_INPUT_RECORDS";
+    public static final String MULTI_INPUTS_RECORD_COUNTER
+            = "Input records from ";
 
     /**
-     * @deprecated use {@link org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil#REDUCE_OUTPUT_RECORDS}
instead.
+     * @deprecated use {@link org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil#MULTI_INPUT_COUNTER_GROUP}
instead.
      */
     @Deprecated
-    public static final String REDUCE_OUTPUT_RECORDS
-            = "REDUCE_OUTPUT_RECORDS";
+    public static final String MULTI_INPUTS_COUNTER_GROUP
+            = "MultiInputCounters";
 
     /**
-     * @deprecated use {@link org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil#HDFS_BYTES_WRITTEN}
instead.
+     * @deprecated use {@link org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil#MULTI_STORE_RECORD_COUNTER}
instead.
      */
     @Deprecated
-    public static final String HDFS_BYTES_WRITTEN
-            = "HDFS_BYTES_WRITTEN";
+    public static final String MULTI_STORE_RECORD_COUNTER
+            = "Output records in ";
 
     /**
-     * @deprecated use {@link org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil#HDFS_BYTES_READ}
instead.
+     * @deprecated use {@link org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil#MULTI_STORE_COUNTER_GROUP}
instead.
      */
     @Deprecated
-    public static final String HDFS_BYTES_READ
-            = "HDFS_BYTES_READ";
+    public static final String MULTI_STORE_COUNTER_GROUP
+            = "MultiStoreCounters";
 
     /**
-     * @deprecated use {@link org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil#MULTI_INPUTS_RECORD_COUNTER}
instead.
+     * @deprecated use {@link org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil#TASK_COUNTER_GROUP}
instead.
      */
     @Deprecated
-    public static final String MULTI_INPUTS_RECORD_COUNTER
-            = "Input records from ";
+    public static final String TASK_COUNTER_GROUP
+            = "org.apache.hadoop.mapred.Task$Counter";
 
     /**
-     * @deprecated use {@link org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil#MULTI_INPUTS_COUNTER_GROUP}
instead.
+     * @deprecated use {@link org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil#FS_COUNTER_GROUP}
instead.
      */
     @Deprecated
-    public static final String MULTI_INPUTS_COUNTER_GROUP
-            = "MultiInputCounters";
+    public static final String FS_COUNTER_GROUP
+            = HadoopShims.getFsCounterGroupName();
 
     /**
      * Returns an empty PigStats object Use of this method is not advised as it
@@ -173,6 +144,4 @@ public class PigStatsUtil {
         PigStats.start(new EmbeddedPigStats(statsMap));
     }
 
-
 }
-

Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=1575965&r1=1575964&r2=1575965&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/ScriptState.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/ScriptState.java Mon Mar 10 15:21:50
2014
@@ -252,7 +252,7 @@ public abstract class ScriptState {
     }
 
     public String getFileName() {
-        return fileName;
+        return (fileName == null) ? "" : fileName;
     }
 
     public void setFileName(String fileName) {

Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java?rev=1575965&r1=1575964&r2=1575965&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java Mon Mar 10
15:21:50 2014
@@ -88,7 +88,6 @@ public final class MRJobStats extends Jo
 
     private Boolean disableCounter = false;
 
-    @SuppressWarnings("deprecation")
     private JobID jobId;
 
     private long maxMapTime = 0;
@@ -107,8 +106,6 @@ public final class MRJobStats extends Jo
     private long mapOutputRecords = 0;
     private long reduceInputRecords = 0;
     private long reduceOutputRecords = 0;
-    private long hdfsBytesWritten = 0;
-    private long hdfsBytesRead = 0;
     private long spillCount = 0;
     private long activeSpillCountObj = 0;
     private long activeSpillCountRecs = 0;
@@ -119,10 +116,8 @@ public final class MRJobStats extends Jo
     private HashMap<String, Long> multiInputCounters
             = new HashMap<String, Long>();
 
-    @SuppressWarnings("deprecation")
     private Counters counters = null;
 
-
     public String getJobId() {
         return (jobId == null) ? null : jobId.toString();
     }
@@ -141,32 +136,32 @@ public final class MRJobStats extends Jo
 
     public long getMinReduceTime() { return minReduceTime; }
 
-    public long getAvgREduceTime() { return avgReduceTime; }
+    public long getAvgReduceTime() { return avgReduceTime; }
 
     public long getMapInputRecords() { return mapInputRecords; }
 
     public long getMapOutputRecords() { return mapOutputRecords; }
 
-    public long getReduceOutputRecords() { return reduceOutputRecords; }
-
     public long getReduceInputRecords() { return reduceInputRecords; }
 
+    public long getReduceOutputRecords() { return reduceOutputRecords; }
+
     public long getSMMSpillCount() { return spillCount; }
 
     public long getProactiveSpillCountObjects() { return activeSpillCountObj; }
 
     public long getProactiveSpillCountRecs() { return activeSpillCountRecs; }
 
-    public long getHdfsBytesWritten() { return hdfsBytesWritten; }
-
-    @SuppressWarnings("deprecation")
     public Counters getHadoopCounters() { return counters; }
 
-
     public Map<String, Long> getMultiStoreCounters() {
         return Collections.unmodifiableMap(multiStoreCounters);
     }
 
+    public Map<String, Long> getMultiInputCounters() {
+        return Collections.unmodifiableMap(multiInputCounters);
+    }
+
     public String getAlias() {
         return (String)getAnnotation(ALIAS);
     }
@@ -187,7 +182,6 @@ public final class MRJobStats extends Jo
         }
     }
 
-    @SuppressWarnings("deprecation")
     void setId(JobID jobId) {
         this.jobId = jobId;
     }
@@ -265,7 +259,6 @@ public final class MRJobStats extends Jo
         return sb.toString();
     }
 
-    @SuppressWarnings("deprecation")
     void addCounters(RunningJob rjob) {
         if (rjob != null) {
             try {

Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java?rev=1575965&r1=1575964&r2=1575965&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java Mon Mar
10 15:21:50 2014
@@ -50,30 +50,18 @@ import org.apache.pig.tools.pigstats.Job
  */
 public class MRPigStatsUtil extends PigStatsUtil {
 
-    public static final String MULTI_STORE_RECORD_COUNTER
-            = "Output records in ";
-    public static final String MULTI_STORE_COUNTER_GROUP
-            = "MultiStoreCounters";
     public static final String TASK_COUNTER_GROUP
             = "org.apache.hadoop.mapred.Task$Counter";
     public static final String FS_COUNTER_GROUP
             = HadoopShims.getFsCounterGroupName();
-    public static final String MAP_INPUT_RECORDS
-            = "MAP_INPUT_RECORDS";
-    public static final String MAP_OUTPUT_RECORDS
-            = "MAP_OUTPUT_RECORDS";
-    public static final String REDUCE_INPUT_RECORDS
-            = "REDUCE_INPUT_RECORDS";
-    public static final String REDUCE_OUTPUT_RECORDS
-            = "REDUCE_OUTPUT_RECORDS";
-    public static final String HDFS_BYTES_WRITTEN
-            = "HDFS_BYTES_WRITTEN";
-    public static final String HDFS_BYTES_READ
-            = "HDFS_BYTES_READ";
     public static final String MULTI_INPUTS_RECORD_COUNTER
             = "Input records from ";
     public static final String MULTI_INPUTS_COUNTER_GROUP
             = "MultiInputCounters";
+    public static final String MULTI_STORE_RECORD_COUNTER
+            = "Output records in ";
+    public static final String MULTI_STORE_COUNTER_GROUP
+            = "MultiStoreCounters";
 
     private static final Log LOG = LogFactory.getLog(MRPigStatsUtil.class);
 
@@ -90,7 +78,6 @@ public class MRPigStatsUtil extends PigS
      * @param counterName the counter name
      * @return the count of the given counter name
      */
-    @SuppressWarnings("deprecation")
     public static long getMultiStoreCount(Job job, JobClient jobClient,
             String counterName) {
         long value = -1;
@@ -166,7 +153,7 @@ public class MRPigStatsUtil extends PigS
      */
     public static void startCollection(PigContext pc, JobClient client,
             JobControlCompiler jcc, MROperPlan plan) {
-        SimplePigStats ps = (SimplePigStats)PigStats.start(new SimplePigStats());
+        SimplePigStats ps = (SimplePigStats)PigStats.get();
         ps.initialize(pc, client, jcc, plan);
 
         MRScriptState.get().emitInitialPlanNotification(plan);

Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java?rev=1575965&r1=1575964&r2=1575965&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java Mon Mar 10 15:21:50
2014
@@ -42,14 +42,26 @@ import org.apache.pig.tools.pigstats.Inp
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Vertex;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 public class TezStats extends PigStats {
     private static final Log LOG = LogFactory.getLog(TezStats.class);
 
+    public static final String DAG_COUNTER =
+            "org.apache.tez.common.counters.DAGCounter";
+    public static final String FS_COUNTER =
+            "org.apache.tez.common.counters.FileSystemCounter";
+    public static final String TASK_COUNTER =
+            "org.apache.tez.common.counters.TaskCounter";
+
+    private List<String> dagStatsStrings;
     private Map<String, TezTaskStats> tezOpVertexMap;
 
     /**
@@ -82,6 +94,7 @@ public class TezStats extends PigStats {
         this.pigContext = pigContext;
         this.jobPlan = new JobGraph();
         this.tezOpVertexMap = Maps.newHashMap();
+        this.dagStatsStrings = Lists.newArrayList();
     }
 
     public void initialize(TezOperPlan tezPlan) {
@@ -106,6 +119,7 @@ public class TezStats extends PigStats {
         sb.append(String.format("%1$20s: %2$-100s%n", "PigVersion", getPigVersion()));
         sb.append(String.format("%1$20s: %2$-100s%n", "TezVersion", TezExecType.getTezVersion()));
         sb.append(String.format("%1$20s: %2$-100s%n", "UserId", userId));
+        sb.append(String.format("%1$20s: %2$-100s%n", "FileName", getFileName()));
         sb.append(String.format("%1$20s: %2$-100s%n", "StartedAt", sdf.format(new Date(startTime))));
         sb.append(String.format("%1$20s: %2$-100s%n", "FinishedAt", sdf.format(new Date(endTime))));
         sb.append(String.format("%1$20s: %2$-100s%n", "Features", getFeatures()));
@@ -132,6 +146,12 @@ public class TezStats extends PigStats {
                 sb.append("\n");
             }
         }
+
+        for (String s : dagStatsStrings) {
+            sb.append(s);
+            sb.append("\n");
+        }
+
         List<InputStats> is = getInputStats();
         for (int i = 0; i < is.size(); i++) {
             String s = is.get(i).getDisplayString(isLocal).trim();
@@ -152,21 +172,23 @@ public class TezStats extends PigStats {
      */
     public void accumulateStats(JobControl jc) throws IOException {
         for (ControlledJob job : jc.getSuccessfulJobList()) {
-            addJobStats((TezJob)job, true);
+            addVertexStats((TezJob)job, true);
+            dagStatsStrings.add(getDisplayString((TezJob)job));
         }
         for (ControlledJob job : jc.getFailedJobList()) {
-            addJobStats((TezJob)job, false);
+            addVertexStats((TezJob)job, false);
+            dagStatsStrings.add(getDisplayString((TezJob)job));
         }
     }
 
-    private void addJobStats(TezJob tezJob, boolean succeeded) throws IOException {
+    private void addVertexStats(TezJob tezJob, boolean succeeded) throws IOException {
         DAG dag = tezJob.getDag();
         for (String name : tezOpVertexMap.keySet()) {
             Vertex v = dag.getVertex(name);
             if (v != null) {
                 byte[] bb = v.getProcessorDescriptor().getUserPayload();
                 Configuration conf = TezUtils.createConfFromUserPayload(bb);
-                addVertexStats(name, conf, succeeded);
+                addVertexStats(name, conf, succeeded, tezJob.getVertexCounters(name));
             }
         }
         if (!succeeded) {
@@ -174,14 +196,44 @@ public class TezStats extends PigStats {
         }
     }
 
-    private void addVertexStats(String tezOpName, Configuration conf, boolean succeeded)
{
+    private void addVertexStats(String tezOpName, Configuration conf, boolean succeeded,
+            Map<String, Long> counters) {
         TezTaskStats stats = tezOpVertexMap.get(tezOpName);
         stats.setConf(conf);
         stats.setId(tezOpName);
         stats.setSuccessful(succeeded);
-        // TODO: Add error messages for each task in failure case
-        stats.addInputStatistics();
-        stats.addOutputStatistics();
+        stats.addInputStatistics(counters);
+        stats.addOutputStatistics(counters);
+    }
+
+    private static String getDisplayString(TezJob tezJob) {
+        StringBuilder sb = new StringBuilder();
+        TezCounters cnt = tezJob.getDagCounters();
+
+        sb.append(String.format("%1$20s: %2$-100s%n", "JobId",
+                tezJob.getJobID()));
+
+        CounterGroup dagGrp = cnt.getGroup(DAG_COUNTER);
+        TezCounter numTasks = dagGrp.findCounter("TOTAL_LAUNCHED_TASKS");
+        sb.append(String.format("%1$20s: %2$-100s%n", "TotalLaunchedTasks",
+                numTasks.getValue()));
+
+        CounterGroup fsGrp = cnt.getGroup(FS_COUNTER);
+        TezCounter bytesRead = fsGrp.findCounter("FILE_BYTES_READ");
+        TezCounter bytesWritten = fsGrp.findCounter("FILE_BYTES_WRITTEN");
+        sb.append(String.format("%1$20s: %2$-100s%n", "FileBytesRead",
+                bytesRead.getValue()));
+        sb.append(String.format("%1$20s: %2$-100s%n", "FileBytesWritten",
+                bytesWritten.getValue()));
+
+        TezCounter hdfsBytesRead = fsGrp.findCounter("HDFS_BYTES_READ");
+        TezCounter hdfsBytesWritten = fsGrp.findCounter("HDFS_BYTES_WRITTEN");
+        sb.append(String.format("%1$20s: %2$-100s%n", "HdfsBytesRead",
+                hdfsBytesRead.getValue()));
+        sb.append(String.format("%1$20s: %2$-100s%n", "HdfsBytesWritten",
+                hdfsBytesWritten.getValue()));
+
+        return sb.toString();
     }
 
     @Override

Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java?rev=1575965&r1=1575964&r2=1575965&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java Mon Mar 10 15:21:50
2014
@@ -19,6 +19,7 @@ import org.apache.pig.tools.pigstats.Job
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats.JobGraph;
 import org.apache.pig.tools.pigstats.PigStats.JobGraphPrinter;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
 
 public class TezTaskStats extends JobStats {
     private static final Log LOG = LogFactory.getLog(TezTaskStats.class);
@@ -77,18 +78,22 @@ public class TezTaskStats extends JobSta
         }
     }
 
-    public void addInputStatistics() {
+    public void addInputStatistics(Map<String, Long> counters) {
         if (inputs == null) {
             LOG.warn("Unable to get inputs of the job");
             return;
         }
 
         for (FileSpec fs : loads) {
-            String filename = fs.getFileName();
-            // TODO: Records and bytesRead are always -1 now. We should update
-            // them when Tez supports counters.
             long records = -1;
             long hdfsBytesRead = -1;
+            String filename = fs.getFileName();
+            if (counters.get(PigStatsUtil.MAP_INPUT_RECORDS) != null) {
+                records = counters.get(PigStatsUtil.MAP_INPUT_RECORDS);
+            }
+            if (counters.get(PigStatsUtil.HDFS_BYTES_READ) != null) {
+                hdfsBytesRead = counters.get(PigStatsUtil.HDFS_BYTES_READ);
+            }
             InputStats is = new InputStats(filename, hdfsBytesRead,
                     records, (state == JobState.SUCCESS));
             is.setConf(conf);
@@ -96,18 +101,22 @@ public class TezTaskStats extends JobSta
         }
     }
 
-    public void addOutputStatistics() {
+    public void addOutputStatistics(Map<String, Long> counters) {
         if (stores == null) {
             LOG.warn("Unable to get stores of the job");
             return;
         }
 
         for (POStore sto : stores) {
-            String filename = sto.getSFile().getFileName();
-            // TODO: Records and bytesRead are always -1 now. We should update
-            // them when Tez supports counters.
             long records = -1;
             long hdfsBytesWritten = -1;
+            String filename = sto.getSFile().getFileName();
+            if (counters.get(PigStatsUtil.MAP_OUTPUT_RECORDS) != null) {
+                records = counters.get(PigStatsUtil.MAP_OUTPUT_RECORDS);
+            }
+            if (counters.get(PigStatsUtil.HDFS_BYTES_WRITTEN) != null) {
+                hdfsBytesWritten = counters.get(PigStatsUtil.HDFS_BYTES_WRITTEN);
+            }
             OutputStats os = new OutputStats(filename, hdfsBytesWritten,
                     records, (state == JobState.SUCCESS));
             os.setPOStore(sto);
@@ -160,7 +169,7 @@ public class TezTaskStats extends JobSta
 
     @Override
     @Deprecated
-    public long getAvgREduceTime() {
+    public long getAvgReduceTime() {
         throw new UnsupportedOperationException();
     }
 
@@ -178,13 +187,13 @@ public class TezTaskStats extends JobSta
 
     @Override
     @Deprecated
-    public long getReduceOutputRecords() {
+    public long getReduceInputRecords() {
         throw new UnsupportedOperationException();
     }
 
     @Override
     @Deprecated
-    public long getReduceInputRecords() {
+    public long getReduceOutputRecords() {
         throw new UnsupportedOperationException();
     }
 
@@ -208,19 +217,19 @@ public class TezTaskStats extends JobSta
 
     @Override
     @Deprecated
-    public long getHdfsBytesWritten() {
+    public Counters getHadoopCounters() {
         throw new UnsupportedOperationException();
     }
 
     @Override
     @Deprecated
-    public Counters getHadoopCounters() {
+    public Map<String, Long> getMultiStoreCounters() {
         throw new UnsupportedOperationException();
     }
 
     @Override
     @Deprecated
-    public Map<String, Long> getMultiStoreCounters() {
+    public Map<String, Long> getMultiInputCounters() {
         throw new UnsupportedOperationException();
     }
 }

Modified: pig/branches/tez/test/org/apache/pig/test/TestCombiner.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestCombiner.java?rev=1575965&r1=1575964&r2=1575965&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestCombiner.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestCombiner.java Mon Mar 10 15:21:50 2014
@@ -40,6 +40,7 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -72,6 +73,14 @@ public class TestCombiner {
         FileLocalizer.setInitialized(false);
     }
 
+    @After
+    public void tearDown() throws Exception{
+        // TODO: once we have Tez local mode, we can get rid of this. For now,
+        // if we run this test suite in Tez mode, we need to set ScriptState to
+        // null to force ScriptState gets initialized every time.
+        ScriptState.start(null);
+    }
+
     @Test
     public void testSuccessiveUserFuncs1() throws Exception {
         String query = "a = load 'students.txt' as (c1,c2,c3,c4); " +

Modified: pig/branches/tez/test/org/apache/pig/test/TestPigServer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestPigServer.java?rev=1575965&r1=1575964&r2=1575965&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestPigServer.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestPigServer.java Mon Mar 10 15:21:50 2014
@@ -102,6 +102,10 @@ public class TestPigServer {
     @After
     public void tearDown() throws Exception{
         tempDir.delete();
+        // TODO: once we have Tez local mode, we can get rid of this. For now,
+        // if we run this test suite in Tez mode, we need to set ScriptState to
+        // null to force ScriptState gets initialized every time.
+        ScriptState.start(null);
     }
 
     @BeforeClass



Mime
View raw message