hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gunt...@apache.org
Subject svn commit: r1530782 - in /hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql: exec/ exec/tez/ log/
Date Wed, 09 Oct 2013 20:24:48 GMT
Author: gunther
Date: Wed Oct  9 20:24:48 2013
New Revision: 1530782

URL: http://svn.apache.org/r1530782
Log:
HIVE-5505: PerfLogger statements for Tez (Gunther Hagleitner)

Modified:
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1530782&r1=1530781&r2=1530782&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Wed
Oct  9 20:24:48 2013
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.exec.pe
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -44,6 +45,8 @@ import org.apache.hadoop.util.Reflection
 public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implements
Serializable {
   private static final long serialVersionUID = 1L;
   private static final Log LOG = LogFactory.getLog(MapJoinOperator.class.getName());
+  private static final String CLASS_NAME = MapJoinOperator.class.getName();
+  private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
 
   private transient String tableKey;
   private transient String serdeKey;
@@ -148,11 +151,12 @@ public class MapJoinOperator extends Abs
         hashTblInitedOnce = true;
       }
     }
-
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.LOAD_HASHTABLE);
     loader.load(this.getExecContext(), hconf, this.getConf(),
         posBigTable, mapJoinTables, mapJoinTableSerdes);
     cache.cache(tableKey, mapJoinTables);
     cache.cache(serdeKey, mapJoinTableSerdes);
+    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.LOAD_HASHTABLE);
   }
 
   // Load the hash table

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1530782&r1=1530781&r2=1530782&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
(original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
Wed Oct  9 20:24:48 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.Op
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.io.Writable;
@@ -62,6 +63,7 @@ public class MapRecordProcessor  extends
   @Override
   void init(JobConf jconf, MRTaskReporter mrReporter, Map<String, LogicalInput> inputs,
       OutputCollector out){
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
     super.init(jconf, mrReporter, inputs, out);
 
     //Update JobConf using MRInput, info like filename comes via this
@@ -121,6 +123,7 @@ public class MapRecordProcessor  extends
         throw new RuntimeException("Map operator initialization failed", e);
       }
     }
+    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
   }
 
   private MRInput getMRInput(Map<String, LogicalInput> inputs) {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java?rev=1530782&r1=1530781&r2=1530782&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
(original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
Wed Oct  9 20:24:48 2013
@@ -25,6 +25,7 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
@@ -50,6 +51,8 @@ public abstract class RecordProcessor  {
 
   private long numRows = 0;
   private long nextUpdateCntr = 1;
+  protected PerfLogger perfLogger = PerfLogger.getPerfLogger();
+  protected String CLASS_NAME = RecordProcessor.class.getName();
 
 
   /**
@@ -84,7 +87,6 @@ public abstract class RecordProcessor  {
     } catch (Exception e) {
       l4j.info("cannot get classpath: " + e.getMessage());
     }
-
   }
 
   /**

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1530782&r1=1530781&r2=1530782&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
(original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
Wed Oct  9 20:24:48 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.exec.tez.tools.InputMerger;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
@@ -88,6 +89,7 @@ public class ReduceRecordProcessor  exte
   @Override
   void init(JobConf jconf, MRTaskReporter mrReporter, Map<String, LogicalInput> inputs,
       OutputCollector out){
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
     super.init(jconf, mrReporter, inputs, out);
 
     ObjectCache cache = ObjectCacheFactory.getCache(jconf);
@@ -166,7 +168,7 @@ public class ReduceRecordProcessor  exte
     reducer.setReporter(reporter);
     MapredContext.get().setReporter(reporter);
 
-
+    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
   }
 
   @Override

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java?rev=1530782&r1=1530781&r2=1530782&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java Wed
Oct  9 20:24:48 2013
@@ -18,38 +18,37 @@
 
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import static org.apache.tez.dag.api.client.DAGStatus.State.*;
+import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING;
 
 import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
-import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.tez.client.TezClient;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.Progress;
 
 /**
- * TezJobMonitor keeps track of a tez job while it's being executed. It will 
- * print status to the console and retrieve final status of the job after 
+ * TezJobMonitor keeps track of a tez job while it's being executed. It will
+ * print status to the console and retrieve final status of the job after
  * completion.
  */
 public class TezJobMonitor {
-  
-  static final private Log LOG = LogFactory.getLog(TezJobMonitor.class.getName());
+
+  private static final Log LOG = LogFactory.getLog(TezJobMonitor.class.getName());
+  private static final String CLASS_NAME = TezJobMonitor.class.getName();
 
   private transient LogHelper console;
+  private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
+  private Set<String> completed;
 
   public TezJobMonitor() {
     console = new LogHelper(LOG);
@@ -64,6 +63,7 @@ public class TezJobMonitor {
    */
   public int monitorExecution(DAGClient dagClient) throws InterruptedException {
     DAGStatus status = null;
+    completed = new HashSet<String>();
 
     boolean running = false;
     boolean done = false;
@@ -75,14 +75,16 @@ public class TezJobMonitor {
     int rc = 0;
     DAGStatus.State lastState = null;
     String lastReport = null;
-    
+
     console.printInfo("\n");
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_DAG);
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
 
     while(true) {
       ++counter;
 
       try {
-        status = dagClient.getDAGStatus();      
+        status = dagClient.getDAGStatus();
         Map<String, Progress> progressMap = status.getVertexProgress();
         failedCounter = 0;
         DAGStatus.State state = status.getState();
@@ -99,11 +101,12 @@ public class TezJobMonitor {
             break;
           case RUNNING:
             if (!running) {
+              perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
               console.printInfo("Status: Running\n");
               printTaskNumbers(progressMap, console);
               running = true;
             }
-            
+
             if (counter % printInterval/checkInterval == 0) {
               lastReport = printStatus(progressMap, lastReport, console);
             }
@@ -155,6 +158,7 @@ public class TezJobMonitor {
       }
       Thread.sleep(500);
     }
+    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_DAG);
     return rc;
   }
 
@@ -165,6 +169,10 @@ public class TezJobMonitor {
     for (String s: keys) {
       Progress progress = progressMap.get(s);
       int percentComplete = (int) (100 * progress.getSucceededTaskCount() / (float) progress.getTotalTaskCount());
+      if (percentComplete == 100 && !completed.contains(s)) {
+        completed.add(s);
+        perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
+      }
       reportBuffer.append(String.format("%s: %3d%% complete\t", s, percentComplete));
     }
 
@@ -178,9 +186,10 @@ public class TezJobMonitor {
 
   private void printTaskNumbers(Map<String, Progress> progressMap, LogHelper console)
{
     StringBuffer reportBuffer = new StringBuffer();
-    
+
     SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
     for (String s: keys) {
+      perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
       Progress progress = progressMap.get(s);
       int numTasks = progress.getTotalTaskCount();
       if (numTasks == 1) {
@@ -189,7 +198,7 @@ public class TezJobMonitor {
         reportBuffer.append(String.format("%s: %7d tasks\t", s, numTasks));
       }
     }
-    
+
     String report = reportBuffer.toString();
     console.printInfo(report);
     console.printInfo("");

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1530782&r1=1530781&r2=1530782&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Wed
Oct  9 20:24:48 2013
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.tez.common.TezUtils;
@@ -46,6 +47,9 @@ public class TezProcessor implements Log
 
   private JobConf jobConf;
 
+  private static final String CLASS_NAME = TezProcessor.class.getName();
+  private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
+
   private TezProcessorContext processorContext;
 
   public TezProcessor(boolean isMap) {
@@ -67,16 +71,19 @@ public class TezProcessor implements Log
   @Override
   public void initialize(TezProcessorContext processorContext)
       throws IOException {
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR);
     this.processorContext = processorContext;
     //get the jobconf
     byte[] userPayload = processorContext.getUserPayload();
     Configuration conf = TezUtils.createConfFromUserPayload(userPayload);
     this.jobConf = new JobConf(conf);
+    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR);
   }
 
   @Override
   public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput>
outputs)
       throws Exception {
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR);
     // in case of broadcast-join read the broadcast edge inputs
     // (possibly asynchronously)
 
@@ -106,6 +113,7 @@ public class TezProcessor implements Log
     rproc.run();
 
     //done - output does not need to be committed as hive does not use outputcommitter
+    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR);
   }
 
   /**

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1530782&r1=1530781&r2=1530782&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Wed Oct
 9 20:24:48 2013
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.Jo
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.TezWork;
 import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
@@ -57,6 +58,9 @@ import org.apache.tez.dag.api.client.DAG
 @SuppressWarnings({"serial", "deprecation"})
 public class TezTask extends Task<TezWork> {
 
+  private static final String CLASS_NAME = TezTask.class.getName();
+  private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
+
   public TezTask() {
     super();
   }
@@ -85,6 +89,7 @@ public class TezTask extends Task<TezWor
       if (!session.isOpen()) {
         // can happen if the user sets the tez flag after the session was
         // established
+        LOG.info("Tez session hasn't been created yet. Opening session");
         session.open(ss.getSessionId(), conf);
       }
 
@@ -138,6 +143,7 @@ public class TezTask extends Task<TezWor
       LocalResource appJarLr, Context ctx)
       throws Exception {
 
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_BUILD_DAG);
     Map<BaseWork, Vertex> workToVertex = new HashMap<BaseWork, Vertex>();
     Map<BaseWork, JobConf> workToConf = new HashMap<BaseWork, JobConf>();
 
@@ -162,10 +168,12 @@ public class TezTask extends Task<TezWor
       boolean isFinal = work.getLeaves().contains(w);
 
       // translate work to vertex
+      perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName());
       JobConf wxConf = DagUtils.initializeVertexConf(conf, w);
       Vertex wx = DagUtils.createVertex(wxConf, w, tezDir,
          appJarLr, additionalLr, fs, ctx, !isFinal);
       dag.addVertex(wx);
+      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName());
       workToVertex.put(w, wx);
       workToConf.put(w, wxConf);
 
@@ -180,7 +188,7 @@ public class TezTask extends Task<TezWor
         dag.addEdge(e);
       }
     }
-
+    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_BUILD_DAG);
     return dag;
   }
 
@@ -188,9 +196,12 @@ public class TezTask extends Task<TezWor
       LocalResource appJarLr, TezSession session)
       throws IOException, TezException, InterruptedException {
 
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG);
+
     // ready to start execution on the cluster
     DAGClient dagClient = session.submitDAG(dag);
 
+    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG);
     return dagClient;
   }
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java?rev=1530782&r1=1530781&r2=1530782&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java Wed Oct  9
20:24:48 2013
@@ -53,6 +53,17 @@ public class PerfLogger {
   public static final String FAILURE_HOOK = "FailureHook.";
   public static final String DRIVER_RUN = "Driver.run";
   public static final String TIME_TO_SUBMIT = "TimeToSubmit";
+  public static final String TEZ_SUBMIT_TO_RUNNING = "TezSubmitToRunningDag";
+  public static final String TEZ_BUILD_DAG = "TezBuildDag";
+  public static final String TEZ_SUBMIT_DAG = "TezSubmitDag";
+  public static final String TEZ_RUN_DAG = "TezRunDag";
+  public static final String TEZ_CREATE_VERTEX = "TezCreateVertex.";
+  public static final String TEZ_RUN_VERTEX = "TezRunVertex.";
+  public static final String TEZ_INITIALIZE_PROCESSOR = "TezInitializeProcessor";
+  public static final String TEZ_RUN_PROCESSOR = "TezRunProcessor";
+  public static final String TEZ_INIT_OPERATORS = "TezInitializeOperators";
+  public static final String LOAD_HASHTABLE = "LoadHashtable";
+
 
   protected static final ThreadLocal<PerfLogger> perfLogger = new ThreadLocal<PerfLogger>();
 



Mime
View raw message