tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-833. Have the Tez task framework set Framework counters instead of MR Processors setting them. (sseth)
Date Thu, 13 Feb 2014 22:58:05 GMT
Updated Branches:
  refs/heads/master 787921e80 -> 199ea91cd


TEZ-833. Have the Tez task framework set Framework counters instead of
MR Processors setting them. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/199ea91c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/199ea91c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/199ea91c

Branch: refs/heads/master
Commit: 199ea91cda92a99620cc337bd806d61521a950f3
Parents: 787921e
Author: Siddharth Seth <sseth@apache.org>
Authored: Thu Feb 13 14:57:26 2014 -0800
Committer: Siddharth Seth <sseth@apache.org>
Committed: Thu Feb 13 14:57:26 2014 -0800

----------------------------------------------------------------------
 .../org/apache/tez/common/TezJobConfig.java     |   3 +
 .../apache/hadoop/mapred/YarnTezDagChild.java   |  37 ++++-
 .../mapreduce/examples/FilterLinesByWord.java   |  44 +++++-
 .../processor/FileSystemStatisticsUpdater.java  |  84 ----------
 .../tez/mapreduce/processor/GcTimeUpdater.java  |  71 ---------
 .../apache/tez/mapreduce/processor/MRTask.java  | 103 ------------
 .../runtime/LogicalIOProcessorRuntimeTask.java  |   3 +-
 .../org/apache/tez/runtime/RuntimeTask.java     |   7 +
 .../metrics/FileSystemStatisticUpdater.java     |  79 ++++++++++
 .../tez/runtime/metrics/GcTimeUpdater.java      |  70 +++++++++
 .../tez/runtime/metrics/TaskCounterUpdater.java | 156 +++++++++++++++++++
 .../java/org/apache/tez/test/TestTezJobs.java   | 146 +++++++++++++++++
 12 files changed, 532 insertions(+), 271 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/199ea91c/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
index 8d347d7..444a830 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -321,4 +321,7 @@ public class TezJobConfig {
   
   public static final String TEZ_RUNTIME_BROADCAST_DATA_VIA_EVENTS_MAX_SIZE = "tez.runtime.broadcast.data-via-events.max-size";
   public static final int TEZ_RUNTIME_BROADCAST_DATA_VIA_EVENTS_MAX_SIZE_DEFAULT = 200 <<
10;// 200KB
+  
+  /** Defines the ProcessTree implementation which will be used to collect resource utilization.
*/
+  public static final String TEZ_RESOURCE_CALCULATOR_PROCESS_TREE_CLASS = "tez.resource.calculator.process-tree.class";
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/199ea91c/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 9f7455a..0e0d036 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -31,7 +31,6 @@ import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -353,6 +352,13 @@ public class YarnTezDagChild {
       public void signalFatalError(TezTaskAttemptID taskAttemptID,
           String diagnostics,
           EventMetaData sourceInfo) {
+        currentTask.setFrameworkCounters();
+        TezEvent statusUpdateEvent =
+            new TezEvent(new TaskStatusUpdateEvent(
+                currentTask.getCounters(), currentTask.getProgress()),
+                new EventMetaData(EventProducerConsumerType.SYSTEM,
+                    currentTask.getVertexName(), "",
+                    currentTask.getTaskAttemptID()));
         TezEvent taskAttemptFailedEvent =
             new TezEvent(new TaskAttemptFailedEvent(diagnostics),
                 sourceInfo);
@@ -360,7 +366,7 @@ public class YarnTezDagChild {
           // Not setting taskComplete - since the main loop responsible for cleanup doesn't
have
           // control yet. Getting control depends on whether the I/P/O returns correctly
after
           // reporting an error.
-          heartbeat(Collections.singletonList(taskAttemptFailedEvent));
+          heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent));
         } catch (Throwable t) {
           LOG.fatal("Failed to communicate task attempt failure to AM via"
               + " umbilical", t);
@@ -435,6 +441,9 @@ public class YarnTezDagChild {
         }
         taskCount++;
 
+        // Reset file system statistics for the new task.
+        FileSystem.clearStatistics();
+        
         // Re-use the UGI only if the Credentials have not changed.
         if (containerTask.haveCredentialsChanged()) {
           LOG.info("Refreshing UGI since Credentials have changed");
@@ -516,6 +525,8 @@ public class YarnTezDagChild {
               currentTaskComplete.set(true);
               // TODONEWTEZ Should the container continue to run if the running task reported
a fatal error ?
               if (!currentTask.hadFatalError()) {
+                // Set counters in case of a successful task.
+                currentTask.setFrameworkCounters();
                 TezEvent statusUpdateEvent =
                     new TezEvent(new TaskStatusUpdateEvent(
                         currentTask.getCounters(), currentTask.getProgress()),
@@ -525,7 +536,7 @@ public class YarnTezDagChild {
                 TezEvent taskCompletedEvent =
                     new TezEvent(new TaskAttemptCompletedEvent(), sourceInfo);
                 heartbeat(Arrays.asList(statusUpdateEvent, taskCompletedEvent));
-              }
+              } // Should the fatalError be reported ?
             } finally {
               currentTask.cleanup();
             }
@@ -556,14 +567,22 @@ public class YarnTezDagChild {
       try {
         taskLock.readLock().lock();
         if (currentTask != null && !currentTask.hadFatalError()) {
+          // TODO Is this of any use if the heartbeat thread is being interrupted first ?
           // Prevent dup failure events
+          currentTask.setFrameworkCounters();
+          TezEvent statusUpdateEvent =
+              new TezEvent(new TaskStatusUpdateEvent(
+                  currentTask.getCounters(), currentTask.getProgress()),
+                  new EventMetaData(EventProducerConsumerType.SYSTEM,
+                      currentTask.getVertexName(), "",
+                      currentTask.getTaskAttemptID()));
           currentTask.setFatalError(e, "FS Error in Child JVM");
           TezEvent taskAttemptFailedEvent =
               new TezEvent(new TaskAttemptFailedEvent(
                   StringUtils.stringifyException(e)),
                   currentSourceInfo);
           currentTaskComplete.set(true);
-          heartbeat(Collections.singletonList(taskAttemptFailedEvent));
+          heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent));
         }
       } finally {
         taskLock.readLock().unlock();
@@ -581,13 +600,21 @@ public class YarnTezDagChild {
       taskLock.readLock().lock();
       try {
         if (currentTask != null && !currentTask.hadFatalError()) {
+          // TODO Is this of any use if the heartbeat thread is being interrupted first ?
           // Prevent dup failure events
           currentTask.setFatalError(throwable, "Error in Child JVM");
+          currentTask.setFrameworkCounters();
+          TezEvent statusUpdateEvent =
+              new TezEvent(new TaskStatusUpdateEvent(
+                  currentTask.getCounters(), currentTask.getProgress()),
+                  new EventMetaData(EventProducerConsumerType.SYSTEM,
+                      currentTask.getVertexName(), "",
+                      currentTask.getTaskAttemptID()));
           TezEvent taskAttemptFailedEvent =
             new TezEvent(new TaskAttemptFailedEvent(cause),
               currentSourceInfo);
           currentTaskComplete.set(true);
-          heartbeat(Collections.singletonList(taskAttemptFailedEvent));
+          heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent));
         }
       } finally {
         taskLock.readLock().unlock();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/199ea91c/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
index f90cc58..d976d17 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
@@ -52,6 +52,7 @@ import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezSession;
 import org.apache.tez.client.TezSessionConfiguration;
 import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -67,6 +68,7 @@ import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.mapreduce.committer.MROutputCommitter;
 import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser;
@@ -81,17 +83,27 @@ import org.apache.tez.runtime.api.TezRootInputInitializer;
 import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
 import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
 
+import com.google.common.collect.Sets;
+
 public class FilterLinesByWord {
 
   private static Log LOG = LogFactory.getLog(FilterLinesByWord.class);
 
   public static final String FILTER_PARAM_NAME = "tez.runtime.examples.filterbyword.word";
+  
+  private TezCounters counters = null;
+  private int errorCode = 0;
+  private boolean exitOnCompletion = false;
 
+  public FilterLinesByWord(boolean exitOnCompletion) {
+    this.exitOnCompletion = exitOnCompletion;
+  }
+  
   private static void printUsage() {
     System.err.println("Usage filtelinesrbyword <in> <out> <filter_word>
[-generateSplitsInClient true/<false>]");
   }
 
-  public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException,
TezException {
+  public void run(String[] args) throws IOException, InterruptedException, ClassNotFoundException,
TezException {
     Configuration conf = new Configuration();
     String [] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
     Credentials credentials = new Credentials();
@@ -105,12 +117,14 @@ public class FilterLinesByWord {
     } catch (ParseException e1) {
       System.err.println("Invalid options");
       printUsage();
-      System.exit(2);
+      errorCode = 2;
+      return;
     }
 
     if (otherArgs.length != 3) {
       printUsage();
-      System.exit(2);
+      errorCode = 2;
+      return;
     }
 
     String inputPath = otherArgs[0];
@@ -120,7 +134,8 @@ public class FilterLinesByWord {
     FileSystem fs = FileSystem.get(conf);
     if (fs.exists(new Path(outputPath))) {
       System.err.println("Output directory : " + outputPath + " already exists");
-      System.exit(2);
+      errorCode = 2;
+      return;
     }
 
     TezConfiguration tezConf = new TezConfiguration(conf);
@@ -270,17 +285,32 @@ public class FilterLinesByWord {
           dagStatus = dagClient.getDAGStatus(null);
         } catch (TezException e) {
           LOG.fatal("Failed to get application progress. Exiting");
-          System.exit(-1);
+          errorCode = -1;
+          return;
         }
       }
+      
+      dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
+      counters = dagStatus.getDAGCounters();
+      
     } finally {
       fs.delete(stagingDir, true);
       tezSession.stop();
     }
 
-    ExampleDriver.printDAGStatus(dagClient, vNames);
+    ExampleDriver.printDAGStatus(dagClient, vNames, true, true);
     LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
-    System.exit(dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1);
+    errorCode = (dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1);
+    return;
+  }
+  
+  public static void main(String[] args) throws IOException, InterruptedException,
+      ClassNotFoundException, TezException {
+    FilterLinesByWord fl = new FilterLinesByWord(true);
+    fl.run(args);
+    if (fl.exitOnCompletion) {
+      System.exit(fl.errorCode);
+    }
   }
 
   public static class TextLongPair implements Writable {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/199ea91c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/FileSystemStatisticsUpdater.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/FileSystemStatisticsUpdater.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/FileSystemStatisticsUpdater.java
deleted file mode 100644
index ff17230..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/FileSystemStatisticsUpdater.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.tez.mapreduce.processor;
-
-import java.util.List;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.tez.common.counters.FileSystemCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.common.counters.TezCounters;
-
-  /**
-   * An updater that tracks the last number reported for a given file
-   * system and only creates the counters when they are needed.
-   */
-  class FileSystemStatisticUpdater {
-    private List<FileSystem.Statistics> stats;
-    private TezCounter readBytesCounter, writeBytesCounter,
-        readOpsCounter, largeReadOpsCounter, writeOpsCounter;
-    private String scheme;
-    private TezCounters counters;
-
-    FileSystemStatisticUpdater(TezCounters counters, List<FileSystem.Statistics> stats,
String scheme) {
-      this.stats = stats;
-      this.scheme = scheme;
-      this.counters = counters;
-    }
-
-    void updateCounters() {
-      if (readBytesCounter == null) {
-        readBytesCounter = counters.findCounter(scheme,
-            FileSystemCounter.BYTES_READ);
-      }
-      if (writeBytesCounter == null) {
-        writeBytesCounter = counters.findCounter(scheme,
-            FileSystemCounter.BYTES_WRITTEN);
-      }
-      if (readOpsCounter == null) {
-        readOpsCounter = counters.findCounter(scheme,
-            FileSystemCounter.READ_OPS);
-      }
-      if (largeReadOpsCounter == null) {
-        largeReadOpsCounter = counters.findCounter(scheme,
-            FileSystemCounter.LARGE_READ_OPS);
-      }
-      if (writeOpsCounter == null) {
-        writeOpsCounter = counters.findCounter(scheme,
-            FileSystemCounter.WRITE_OPS);
-      }
-      long readBytes = 0;
-      long writeBytes = 0;
-      long readOps = 0;
-      long largeReadOps = 0;
-      long writeOps = 0;
-      for (FileSystem.Statistics stat: stats) {
-        readBytes = readBytes + stat.getBytesRead();
-        writeBytes = writeBytes + stat.getBytesWritten();
-        readOps = readOps + stat.getReadOps();
-        largeReadOps = largeReadOps + stat.getLargeReadOps();
-        writeOps = writeOps + stat.getWriteOps();
-      }
-      readBytesCounter.setValue(readBytes);
-      writeBytesCounter.setValue(writeBytes);
-      readOpsCounter.setValue(readOps);
-      largeReadOpsCounter.setValue(largeReadOps);
-      writeOpsCounter.setValue(writeOps);
-    }
-  }
-  

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/199ea91c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/GcTimeUpdater.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/GcTimeUpdater.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/GcTimeUpdater.java
deleted file mode 100644
index 79ff419..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/GcTimeUpdater.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.tez.mapreduce.processor;
-
-import java.lang.management.GarbageCollectorMXBean;
-import java.lang.management.ManagementFactory;
-import java.util.List;
-
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.common.counters.TaskCounter;
-
-/**
-   * An updater that tracks the amount of time this task has spent in GC.
-   */
-  class GcTimeUpdater {
-    private long lastGcMillis = 0;
-    private List<GarbageCollectorMXBean> gcBeans = null;
-    TezCounters counters;
-
-    public GcTimeUpdater(TezCounters counters) {
-      this.gcBeans = ManagementFactory.getGarbageCollectorMXBeans();
-      getElapsedGc(); // Initialize 'lastGcMillis' with the current time spent.
-      this.counters = counters;
-    }
-
-    /**
-     * @return the number of milliseconds that the gc has used for CPU
-     * since the last time this method was called.
-     */
-    protected long getElapsedGc() {
-      long thisGcMillis = 0;
-      for (GarbageCollectorMXBean gcBean : gcBeans) {
-        thisGcMillis += gcBean.getCollectionTime();
-      }
-
-      long delta = thisGcMillis - lastGcMillis;
-      this.lastGcMillis = thisGcMillis;
-      return delta;
-    }
-
-    /**
-     * Increment the gc-elapsed-time counter.
-     */
-    public void incrementGcCounter() {
-      if (null == counters) {
-        return; // nothing to do.
-      }
-
-      TezCounter gcCounter =
-        counters.findCounter(TaskCounter.GC_TIME_MILLIS);
-      if (null != gcCounter) {
-        gcCounter.increment(getElapsedGc());
-      }
-    }
-  }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/199ea91c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
index 242b798..a5fda8c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
@@ -22,10 +22,8 @@ import java.io.IOException;
 import java.net.URI;
 import java.text.NumberFormat;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -63,11 +61,9 @@ import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezTaskStatus.State;
 import org.apache.tez.common.TezUtils;
-import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.common.security.JobTokenIdentifier;
@@ -98,9 +94,6 @@ public abstract class MRTask {
 
   // Current counters
   transient TezCounters counters;
-  protected GcTimeUpdater gcUpdater;
-  private ResourceCalculatorProcessTree pTree;
-  private long initCpuCumulativeTime = 0;
   protected TezProcessorContext processorContext;
   protected TaskAttemptID taskAttemptId;
   protected Progress progress = new Progress();
@@ -125,12 +118,6 @@ public abstract class MRTask {
   protected MRTaskReporter mrReporter;
   protected boolean useNewApi;
 
-  /**
-   * A Map where Key-> URIScheme and value->FileSystemStatisticUpdater
-   */
-  private Map<String, FileSystemStatisticUpdater> statisticUpdaters =
-     new HashMap<String, FileSystemStatisticUpdater>();
-
   public MRTask(boolean isMap) {
     this.isMap = isMap;
   }
@@ -150,9 +137,6 @@ public abstract class MRTask {
             (isMap ? TaskType.MAP : TaskType.REDUCE),
             context.getTaskIndex()),
           context.getTaskAttemptNumber());
-    // TODO TEZAM4 Figure out initialization / run sequence of Input, Process,
-    // Output. Phase is MR specific.
-    gcUpdater = new GcTimeUpdater(counters);
 
     byte[] userPayload = context.getUserPayload();
     Configuration conf = TezUtils.createConfFromUserPayload(userPayload);
@@ -168,8 +152,6 @@ public abstract class MRTask {
     jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
         context.getDAGAttemptNumber());
 
-    initResourceCalculatorPlugin();
-
     LOG.info("MRTask.inited: taskAttemptId = " + taskAttemptId.toString());
 
     // TODO Post MRR
@@ -322,20 +304,6 @@ public abstract class MRTask {
     }
   }
 
-
-  private void initResourceCalculatorPlugin() {
-    Class<? extends ResourceCalculatorProcessTree> clazz =
-        this.jobConf.getClass(MRConfig.RESOURCE_CALCULATOR_PROCESS_TREE,
-            null, ResourceCalculatorProcessTree.class);
-    pTree = ResourceCalculatorProcessTree
-        .getResourceCalculatorProcessTree(System.getenv().get("JVM_PID"), clazz, this.jobConf);
-    LOG.info(" Using ResourceCalculatorProcessTree : " + pTree);
-    if (pTree != null) {
-      pTree.updateProcessTree();
-      initCpuCumulativeTime = pTree.getCumulativeCpuTime();
-    }
-  }
-
   public TezProcessorContext getUmbilical() {
     return this.processorContext;
   }
@@ -419,7 +387,6 @@ public abstract class MRTask {
   }
 
   public void done() throws IOException, InterruptedException {
-    updateCounters();
 
     LOG.info("Task:" + taskAttemptId + " is done."
         + " And is in the process of committing");
@@ -435,12 +402,7 @@ public abstract class MRTask {
       }
     }
     taskDone.set(true);
-    // Make sure we send at least one set of counter increments. It's
-    // ok to call updateCounters() in this thread after comm thread stopped.
-    updateCounters();
     sendLastUpdate();
-    //signal the tasktracker that we are done
-    //sendDone(umbilical);
   }
 
   /**
@@ -505,71 +467,6 @@ public abstract class MRTask {
     }
   }
 
-
-  public void updateCounters() {
-    // TODO Auto-generated method stub
-    // TODO TEZAM Implement.
-    Map<String, List<FileSystem.Statistics>> map = new
-        HashMap<String, List<FileSystem.Statistics>>();
-    for(Statistics stat: FileSystem.getAllStatistics()) {
-      String uriScheme = stat.getScheme();
-      if (map.containsKey(uriScheme)) {
-        List<FileSystem.Statistics> list = map.get(uriScheme);
-        list.add(stat);
-      } else {
-        List<FileSystem.Statistics> list = new ArrayList<FileSystem.Statistics>();
-        list.add(stat);
-        map.put(uriScheme, list);
-      }
-    }
-    for (Map.Entry<String, List<FileSystem.Statistics>> entry: map.entrySet())
{
-      FileSystemStatisticUpdater updater = statisticUpdaters.get(entry.getKey());
-      if(updater==null) {//new FileSystem has been found in the cache
-        updater =
-            new FileSystemStatisticUpdater(counters, entry.getValue(),
-                entry.getKey());
-        statisticUpdaters.put(entry.getKey(), updater);
-      }
-      updater.updateCounters();
-    }
-
-    gcUpdater.incrementGcCounter();
-    updateResourceCounters();
-  }
-
-  /**
-   * Updates the {@link TaskCounter#COMMITTED_HEAP_BYTES} counter to reflect the
-   * current total committed heap space usage of this JVM.
-   */
-  private void updateHeapUsageCounter() {
-    long currentHeapUsage = Runtime.getRuntime().totalMemory();
-    counters.findCounter(TaskCounter.COMMITTED_HEAP_BYTES)
-            .setValue(currentHeapUsage);
-  }
-
-  /**
-   * Update resource information counters
-   */
-  void updateResourceCounters() {
-    // Update generic resource counters
-    updateHeapUsageCounter();
-
-    // Updating resources specified in ResourceCalculatorPlugin
-    if (pTree == null) {
-      return;
-    }
-    pTree.updateProcessTree();
-    long cpuTime = pTree.getCumulativeCpuTime();
-    long pMem = pTree.getCumulativeRssmem();
-    long vMem = pTree.getCumulativeVmem();
-    // Remove the CPU time consumed previously by JVM reuse
-    cpuTime -= initCpuCumulativeTime;
-    counters.findCounter(TaskCounter.CPU_MILLISECONDS).setValue(cpuTime);
-    counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).setValue(pMem);
-    counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).setValue(vMem);
-  }
-
-
   public static String normalizeStatus(String status, Configuration conf) {
     // Check to see if the status string is too long
     // and truncate it if needed.

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/199ea91c/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index a8bb1d4..21cc810 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -140,6 +140,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     this.state = State.NEW;
     this.appAttemptNumber = appAttemptNumber;
     int numInitializers = numInputs + numOutputs; // Processor is initialized in the main
thread.
+    numInitializers = (numInitializers == 0 ? 1 : numInitializers); 
     this.initializerExecutor = Executors.newFixedThreadPool(
         numInitializers,
         new ThreadFactoryBuilder().setDaemon(true)
@@ -684,5 +685,5 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   public LogicalIOProcessor getProcessor() {
     return this.processor;
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/199ea91c/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
index f018333..30fbe76 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
@@ -28,6 +28,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.TezUmbilical;
+import org.apache.tez.runtime.metrics.TaskCounterUpdater;
 
 public abstract class RuntimeTask {
 
@@ -41,6 +42,7 @@ public abstract class RuntimeTask {
   protected final TezUmbilical tezUmbilical;
   protected final AtomicInteger eventCounter;
   private final AtomicBoolean taskDone;
+  private final TaskCounterUpdater counterUpdater;
 
   protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf,
       TezUmbilical tezUmbilical) {
@@ -51,6 +53,7 @@ public abstract class RuntimeTask {
     this.eventCounter = new AtomicInteger(0);
     this.progress = 0.0f;
     this.taskDone = new AtomicBoolean(false);
+    this.counterUpdater = new TaskCounterUpdater(tezCounters, tezConf);
   }
 
   protected enum State {
@@ -98,6 +101,10 @@ public abstract class RuntimeTask {
   public boolean isTaskDone() {
     return taskDone.get();
   }
+  
+  public void setFrameworkCounters() {
+    this.counterUpdater.updateCounters();
+  }
 
   protected void setTaskDone() {
     taskDone.set(true);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/199ea91c/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/FileSystemStatisticUpdater.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/FileSystemStatisticUpdater.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/FileSystemStatisticUpdater.java
new file mode 100644
index 0000000..bb15ef1
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/FileSystemStatisticUpdater.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.runtime.metrics;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+
+/**
+ * An updater that tracks the last number reported for a given file system and
+ * only creates the counters when they are needed.
+ */
+public class FileSystemStatisticUpdater {
+
+  private List<FileSystem.Statistics> stats;
+  private TezCounter readBytesCounter, writeBytesCounter, readOpsCounter, largeReadOpsCounter,
+      writeOpsCounter;
+  private String scheme;
+  private TezCounters counters;
+
+  FileSystemStatisticUpdater(TezCounters counters, List<FileSystem.Statistics> stats,
String scheme) {
+    this.stats = stats;
+    this.scheme = scheme;
+    this.counters = counters;
+  }
+
+  void updateCounters() {
+    if (readBytesCounter == null) {
+      readBytesCounter = counters.findCounter(scheme, FileSystemCounter.BYTES_READ);
+    }
+    if (writeBytesCounter == null) {
+      writeBytesCounter = counters.findCounter(scheme, FileSystemCounter.BYTES_WRITTEN);
+    }
+    if (readOpsCounter == null) {
+      readOpsCounter = counters.findCounter(scheme, FileSystemCounter.READ_OPS);
+    }
+    if (largeReadOpsCounter == null) {
+      largeReadOpsCounter = counters.findCounter(scheme, FileSystemCounter.LARGE_READ_OPS);
+    }
+    if (writeOpsCounter == null) {
+      writeOpsCounter = counters.findCounter(scheme, FileSystemCounter.WRITE_OPS);
+    }
+    long readBytes = 0;
+    long writeBytes = 0;
+    long readOps = 0;
+    long largeReadOps = 0;
+    long writeOps = 0;
+    for (FileSystem.Statistics stat : stats) {
+      readBytes = readBytes + stat.getBytesRead();
+      writeBytes = writeBytes + stat.getBytesWritten();
+      readOps = readOps + stat.getReadOps();
+      largeReadOps = largeReadOps + stat.getLargeReadOps();
+      writeOps = writeOps + stat.getWriteOps();
+    }
+    readBytesCounter.setValue(readBytes);
+    writeBytesCounter.setValue(writeBytes);
+    readOpsCounter.setValue(readOps);
+    largeReadOpsCounter.setValue(largeReadOps);
+    writeOpsCounter.setValue(writeOps);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/199ea91c/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/GcTimeUpdater.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/GcTimeUpdater.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/GcTimeUpdater.java
new file mode 100644
index 0000000..5e551c5
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/GcTimeUpdater.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.runtime.metrics;
+
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.util.List;
+
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.common.counters.TaskCounter;
+
+/**
+ * An updater that tracks the amount of time this task has spent in GC.
+ */
+class GcTimeUpdater {
+  private long lastGcMillis = 0;
+  private List<GarbageCollectorMXBean> gcBeans = null;
+  TezCounters counters;
+
+  public GcTimeUpdater(TezCounters counters) {
+    this.gcBeans = ManagementFactory.getGarbageCollectorMXBeans();
+    getElapsedGc(); // Initialize 'lastGcMillis' with the current time spent.
+    this.counters = counters;
+  }
+
+  /**
+   * @return the number of milliseconds that the gc has used for CPU since the
+   *         last time this method was called.
+   */
+  protected long getElapsedGc() {
+    long thisGcMillis = 0;
+    for (GarbageCollectorMXBean gcBean : gcBeans) {
+      thisGcMillis += gcBean.getCollectionTime();
+    }
+
+    long delta = thisGcMillis - lastGcMillis;
+    this.lastGcMillis = thisGcMillis;
+    return delta;
+  }
+
+  /**
+   * Increment the gc-elapsed-time counter.
+   */
+  void incrementGcCounter() {
+    if (null == counters) {
+      return; // nothing to do.
+    }
+
+    TezCounter gcCounter = counters.findCounter(TaskCounter.GC_TIME_MILLIS);
+    if (null != gcCounter) {
+      gcCounter.increment(getElapsedGc());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/199ea91c/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java
new file mode 100644
index 0000000..07e1869
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.metrics;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounters;
+
+
+/**
+ * Updates counters with various task specific statistics. Currently, this
+ * should be invoked only once per task. TODO Eventually - change this so that
+ * counters can be updated incrementally during task execution.
+ */
+public class TaskCounterUpdater {
+
+  private static final Log LOG = LogFactory.getLog(TaskCounterUpdater.class);
+  
+  private final TezCounters tezCounters;
+  private final Configuration conf;
+  
+  private long initialCpuCumulativeTime;
+  
+  /**
+   * A Map where Key-> URIScheme and value->FileSystemStatisticUpdater
+   */
+  private Map<String, FileSystemStatisticUpdater> statisticUpdaters =
+     new HashMap<String, FileSystemStatisticUpdater>();
+  protected GcTimeUpdater gcUpdater;
+  private ResourceCalculatorProcessTree pTree;
+  private long initCpuCumulativeTime = 0;
+  
+  public TaskCounterUpdater(TezCounters counters, Configuration conf) {
+    this.tezCounters = counters;
+    this.conf = conf;   
+    this.gcUpdater = new GcTimeUpdater(tezCounters);
+    initResourceCalculatorPlugin();
+    recordInitialCpuStats();
+  }
+
+  
+  public void updateCounters() {
+    // FileSystemStatistics are reset each time a new task is seen by the
+    // container.
+    // This doesn't remove the fileSystem, and does not clear all statistics -
+    // so there is a potential of an unused FileSystem showing up for a
+    // Container, and strange values for READ_OPS etc.
+    Map<String, List<FileSystem.Statistics>> map = new
+        HashMap<String, List<FileSystem.Statistics>>();
+    for(Statistics stat: FileSystem.getAllStatistics()) {
+      String uriScheme = stat.getScheme();
+      if (map.containsKey(uriScheme)) {
+        List<FileSystem.Statistics> list = map.get(uriScheme);
+        list.add(stat);
+      } else {
+        List<FileSystem.Statistics> list = new ArrayList<FileSystem.Statistics>();
+        list.add(stat);
+        map.put(uriScheme, list);
+      }
+    }
+    for (Map.Entry<String, List<FileSystem.Statistics>> entry: map.entrySet())
{
+      FileSystemStatisticUpdater updater = statisticUpdaters.get(entry.getKey());
+      if(updater==null) {//new FileSystem has been found in the cache
+        updater =
+            new FileSystemStatisticUpdater(tezCounters, entry.getValue(),
+                entry.getKey());
+        statisticUpdaters.put(entry.getKey(), updater);
+      }
+      updater.updateCounters();
+    }
+
+    gcUpdater.incrementGcCounter();
+    updateResourceCounters();
+  }
+  
+  private void recordInitialCpuStats() {
+    if (pTree != null) {
+      pTree.updateProcessTree();
+      initCpuCumulativeTime = pTree.getCumulativeCpuTime();
+    }
+  }
+  
+  /**
+   * Update resource information counters
+   */
+  void updateResourceCounters() {
+    // Update generic resource counters
+    updateHeapUsageCounter();
+
+    // Updating resources specified in ResourceCalculatorPlugin
+    if (pTree == null) {
+      return;
+    }
+    pTree.updateProcessTree();
+    long cpuTime = pTree.getCumulativeCpuTime();
+    long pMem = pTree.getCumulativeRssmem();
+    long vMem = pTree.getCumulativeVmem();
+    // Remove the CPU time consumed previously by JVM reuse
+    cpuTime -= initCpuCumulativeTime;
+    tezCounters.findCounter(TaskCounter.CPU_MILLISECONDS).setValue(cpuTime - initCpuCumulativeTime);
+    tezCounters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).setValue(pMem);
+    tezCounters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).setValue(vMem);
+  }
+  
+  /**
+   * Updates the {@link TaskCounter#COMMITTED_HEAP_BYTES} counter to reflect the
+   * current total committed heap space usage of this JVM.
+   */
+  private void updateHeapUsageCounter() {
+    long currentHeapUsage = Runtime.getRuntime().totalMemory();
+    tezCounters.findCounter(TaskCounter.COMMITTED_HEAP_BYTES)
+            .setValue(currentHeapUsage);
+  }
+  
+  private void initResourceCalculatorPlugin() {
+    Class<? extends ResourceCalculatorProcessTree> clazz = this.conf.getClass(
+        TezJobConfig.TEZ_RESOURCE_CALCULATOR_PROCESS_TREE_CLASS, null,
+        ResourceCalculatorProcessTree.class); 
+
+    pTree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree(
+        System.getenv().get("JVM_PID"), clazz, conf);
+
+    LOG.info(" Using ResourceCalculatorProcessTree : " + pTree);
+    if (pTree != null) {
+      pTree.updateProcessTree();
+      initCpuCumulativeTime = pTree.getCumulativeCpuTime();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/199ea91c/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
new file mode 100644
index 0000000..ebbc3c1
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.mapreduce.examples.ExampleDriver;
+import org.apache.tez.runtime.library.processor.SleepProcessor;
+import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Tests which do not rely on Map/Reduce processor
+ * 
+ */
+public class TestTezJobs {
+
+  private static final Log LOG = LogFactory.getLog(TestTezJobs.class);
+
+  protected static MiniTezCluster mrrTezCluster;
+  protected static MiniDFSCluster dfsCluster;
+
+  private static Configuration conf = new Configuration();
+  private static FileSystem remoteFs;
+
+  private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR + TestTezJobs.class.getName()
+      + "-tmpDir";
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    try {
+      conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+      dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null)
+          .build();
+      remoteFs = dfsCluster.getFileSystem();
+    } catch (IOException io) {
+      throw new RuntimeException("problem starting mini dfs cluster", io);
+    }
+
+    if (mrrTezCluster == null) {
+      mrrTezCluster = new MiniTezCluster(TestTezJobs.class.getName(), 1, 1, 1);
+      Configuration conf = new Configuration();
+      conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+      mrrTezCluster.init(conf);
+      mrrTezCluster.start();
+    }
+
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    if (mrrTezCluster != null) {
+      mrrTezCluster.stop();
+      mrrTezCluster = null;
+    }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+      dfsCluster = null;
+    }
+    // TODO Add cleanup code.
+  }
+
+  @Test(timeout = 60000)
+  public void testSleepJob() throws TezException, IOException, InterruptedException {
+    SleepProcessorConfig spConf = new SleepProcessorConfig(1);
+
+    DAG dag = new DAG("TezSleepProcessor");
+    Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor(
+        SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+        Resource.newInstance(1024, 1));
+    dag.addVertex(vertex);
+
+    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+    Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(new Random()
+        .nextInt(100000))));
+    remoteFs.mkdirs(remoteStagingDir);
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
+
+    TezClient tezClient = new TezClient(tezConf);
+    AMConfiguration amConf = new AMConfiguration(new HashMap<String, String>(),
+        new HashMap<String, LocalResource>(), tezConf, null);
+
+    DAGClient dagClient = tezClient.submitDAGApplication(dag, amConf);
+
+    DAGStatus dagStatus = dagClient.getDAGStatus(null);
+    while (!dagStatus.isCompleted()) {
+      LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+          + dagStatus.getState());
+      Thread.sleep(500l);
+      dagStatus = dagClient.getDAGStatus(null);
+    }
+    dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
+
+    assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+    assertNotNull(dagStatus.getDAGCounters());
+    assertNotNull(dagStatus.getDAGCounters().getGroup(FileSystemCounter.class.getName()));
+    assertNotNull(dagStatus.getDAGCounters().findCounter(TaskCounter.GC_TIME_MILLIS));
+    ExampleDriver.printDAGStatus(dagClient, new String[] { "SleepVertex" }, true, true);
+
+  }
+}


Mime
View raw message