tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject [1/2] TEZ-12. Support for counters. (hitesh)
Date Wed, 06 Nov 2013 00:35:16 GMT
Updated Branches:
  refs/heads/master 83a657bb5 -> 6fddbd01b


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index fa2533b..65df726 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -20,9 +20,12 @@ package org.apache.tez.mapreduce.examples;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.StringTokenizer;
 
 import org.apache.commons.cli.ParseException;
@@ -65,6 +68,7 @@ import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
@@ -202,7 +206,7 @@ public class OrderedWordCount {
     List<Vertex> vertices = new ArrayList<Vertex>();
 
     byte[] mapPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
-    byte[] mapInputPayload = 
+    byte[] mapInputPayload =
         MRHelpers.createMRInputPayload(mapPayload, null);
     int numMaps = generateSplitsInClient ? inputSplitInfo.getNumTasks() : -1;
     Vertex mapVertex = new Vertex("initialmap", new ProcessorDescriptor(
@@ -229,7 +233,7 @@ public class OrderedWordCount {
     MRHelpers.addMRInput(mapVertex, mapInputPayload, initializerClazz);
     vertices.add(mapVertex);
 
-    Vertex ivertex = new Vertex("ivertex1", new ProcessorDescriptor(
+    Vertex ivertex = new Vertex("intermediate_reducer", new ProcessorDescriptor(
         ReduceProcessor.class.getName()).
         setUserPayload(MRHelpers.createUserPayloadFromConf(iReduceStageConf)),
         2,
@@ -277,14 +281,14 @@ public class OrderedWordCount {
     System.err.println("Usage (In Session Mode):"
         + " orderedwordcount <in1> <out1> ... <inN> <outN> [-generateSplitsInClient
true/<false>]");
   }
-  
-  
+
+
   public static void main(String[] args) throws Exception {
     Configuration conf = new Configuration();
     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-    
+
     boolean generateSplitsInClient = false;
-    
+
     SplitsInClientOptionParser splitCmdLineParser = new SplitsInClientOptionParser();
     try {
       generateSplitsInClient = splitCmdLineParser.parse(otherArgs, false);
@@ -349,6 +353,11 @@ public class OrderedWordCount {
     }
 
     DAGStatus dagStatus = null;
+    DAGClient dagClient = null;
+    String[] vNames = { "initialmap", "intermediate_reducer",
+      "finalreduce" };
+
+    Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
     try {
       for (int dagIndex = 1; dagIndex <= inputPaths.size(); ++dagIndex) {
         if (dagIndex != 1
@@ -378,7 +387,6 @@ public class OrderedWordCount {
         DAG dag = createDAG(fs, conf, null, stagingDir,
             dagIndex, inputPath, outputPath, generateSplitsInClient);
 
-        DAGClient dagClient;
         if (useTezSession) {
           LOG.info("Waiting for TezSession to get into ready state");
           waitForTezSessionReady(tezSession);
@@ -391,7 +399,7 @@ public class OrderedWordCount {
         }
 
         while (true) {
-          dagStatus = dagClient.getDAGStatus();
+          dagStatus = dagClient.getDAGStatus(statusGetOpts);
           if(dagStatus.getState() == DAGStatus.State.RUNNING ||
               dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
               dagStatus.getState() == DAGStatus.State.FAILED ||
@@ -406,21 +414,23 @@ public class OrderedWordCount {
           }
         }
 
+
         while (dagStatus.getState() == DAGStatus.State.RUNNING) {
           try {
-            ExampleDriver.printMRRDAGStatus(dagStatus);
+            ExampleDriver.printDAGStatus(dagClient, vNames);
             try {
               Thread.sleep(1000);
             } catch (InterruptedException e) {
               // continue;
             }
-            dagStatus = dagClient.getDAGStatus();
+            dagStatus = dagClient.getDAGStatus(statusGetOpts);
           } catch (TezException e) {
             LOG.fatal("Failed to get application progress. Exiting");
             System.exit(-1);
           }
         }
-        ExampleDriver.printMRRDAGStatus(dagStatus);
+        ExampleDriver.printDAGStatus(dagClient, vNames,
+            true, true);
         LOG.info("DAG " + dagIndex + " completed. "
             + "FinalState=" + dagStatus.getState());
       }
@@ -432,7 +442,7 @@ public class OrderedWordCount {
     }
 
     if (!useTezSession) {
-      ExampleDriver.printMRRDAGStatus(dagStatus);
+      ExampleDriver.printDAGStatus(dagClient, vNames);
       LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
       System.exit(dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
index d5b61b1..e010ec3 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
@@ -634,7 +634,7 @@ public class YARNRunner implements ClientProtocol {
       if(dagClient == null) {
         dagClient = tezClient.getDAGClient(TypeConverter.toYarn(jobID).getAppId());
       }
-      dagStatus = dagClient.getDAGStatus();
+      dagStatus = dagClient.getDAGStatus(null);
       return new DAGJobStatus(dagClient.getApplicationReport(), dagStatus, jobFile);
     } catch (TezException e) {
       throw new IOException(e);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index c2b6451..c444cec 100644
--- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -496,7 +496,7 @@ public class TestMRRJobsDAGApi {
       Assert.assertEquals(TezSessionStatus.RUNNING,
           tezSession.getSessionStatus());
     }
-    DAGStatus dagStatus = dagClient.getDAGStatus();
+    DAGStatus dagStatus = dagClient.getDAGStatus(null);
     while (!dagStatus.isCompleted()) {
       LOG.info("Waiting for job to complete. Sleeping for 500ms."
           + " Current state: " + dagStatus.getState());
@@ -510,7 +510,7 @@ public class TestMRRJobsDAGApi {
           dagClient.tryKillDAG();
         }
       }
-      dagStatus = dagClient.getDAGStatus();
+      dagStatus = dagClient.getDAGStatus(null);
     }
     if (dagViaRPC && !reuseSession) {
       tezSession.stop();


Mime
View raw message