tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject tez git commit: TEZ-2836. Avoid setting framework/system counters for tasks running in threads. (sseth)
Date Mon, 28 Sep 2015 23:10:25 GMT
Repository: tez
Updated Branches:
  refs/heads/master 406721ab1 -> 8b412ee66


TEZ-2836. Avoid setting framework/system counters for tasks running in
threads. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8b412ee6
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8b412ee6
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8b412ee6

Branch: refs/heads/master
Commit: 8b412ee66fe042db60a567ff71639839af5fa854
Parents: 406721a
Author: Siddharth Seth <sseth@apache.org>
Authored: Mon Sep 28 16:10:11 2015 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Mon Sep 28 16:10:11 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../app/launcher/LocalContainerLauncher.java    |  2 +-
 .../tez/service/impl/ContainerRunnerImpl.java   |  4 +-
 .../tez/mapreduce/processor/MapUtils.java       |  2 +-
 .../processor/reduce/TestReduceProcessor.java   |  2 +-
 .../runtime/LogicalIOProcessorRuntimeTask.java  |  5 +-
 .../org/apache/tez/runtime/RuntimeTask.java     | 12 ++-
 .../org/apache/tez/runtime/task/TezChild.java   | 14 ++--
 .../apache/tez/runtime/task/TezTaskRunner2.java |  8 +-
 .../TestLogicalIOProcessorRuntimeTask.java      |  7 +-
 .../tez/runtime/task/TestTaskExecution2.java    | 77 +++++++++++++++++---
 11 files changed, 99 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 01fa23e..d219127 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.8.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2836. Avoid setting framework/system counters for tasks running in threads.
   TEZ-2398. Flaky test: TestFaultTolerance
   TEZ-2833. Dont create extra directory during ATS file download
   TEZ-2834. Make Tez preemption resilient to incorrect free resource reported

http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index 6cd6fce..9267f00 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -356,7 +356,7 @@ public class LocalContainerLauncher extends ContainerLauncher {
     TezChild tezChild =
         TezChild.newTezChild(defaultConf, null, 0, containerId.toString(), tokenIdentifier,
             attemptNumber, localDirs, workingDirectory, containerEnv, "", executionContext,
credentials,
-            memAvailable, context.getUser(), tezTaskUmbilicalProtocol);
+            memAvailable, context.getUser(), tezTaskUmbilicalProtocol, false);
     return tezChild;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
index fb4c08f..ad05af9 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
@@ -305,7 +305,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
               request.getContainerIdString(),
               request.getTokenIdentifier(), request.getAppAttemptNumber(), workingDir, localDirs,
               envMap, objectRegistry, pid,
-              executionContext, credentials, memoryAvailable, request.getUser(), null);
+              executionContext, credentials, memoryAvailable, request.getUser(), null, false);
       ContainerExecutionResult result = tezChild.run();
       LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" +
           sw.stop().elapsedMillis());
@@ -449,7 +449,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
           request.getAppAttemptNumber(),
           serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, objectRegistry,
           pid,
-          executionContext, memoryAvailable);
+          executionContext, memoryAvailable, false);
 
       boolean shouldDie;
       try {

http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index 8841882..71aa87c 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -232,7 +232,7 @@ public class MapUtils {
         serviceConsumerMetadata,
         envMap,
         HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"),
-        Runtime.getRuntime().maxMemory());
+        Runtime.getRuntime().maxMemory(), true);
     return task;
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index fcb42b3..db78b6e 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -224,7 +224,7 @@ public class TestReduceProcessor {
         serviceConsumerMetadata,
         serviceProviderEnvMap,
         HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"),
-        Runtime.getRuntime().maxMemory());
+        Runtime.getRuntime().maxMemory(), true);
 
     List<Event> destEvents = new LinkedList<Event>();
     destEvents.add(dme);

http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 5b0e62f..5db96c5 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -156,10 +156,11 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       Configuration tezConf, String[] localDirs, TezUmbilical tezUmbilical,
       Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> envMap,
       Multimap<String, String> startedInputsMap, ObjectRegistry objectRegistry,
-      String pid, ExecutionContext ExecutionContext, long memAvailable) throws IOException
{
+      String pid, ExecutionContext ExecutionContext, long memAvailable,
+      boolean updateSysCounters) throws IOException {
     // Note: If adding any fields here, make sure they're cleaned up in the cleanupContext
method.
     // TODO Remove jobToken from here post TEZ-421
-    super(taskSpec, tezConf, tezUmbilical, pid);
+    super(taskSpec, tezConf, tezUmbilical, pid, updateSysCounters);
     LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: "
         + taskSpec);
     int numInputs = taskSpec.getInputs().size();

http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
index 33c0113..c9c6ba1 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
@@ -56,7 +56,7 @@ public abstract class RuntimeTask {
   private final TaskStatistics statistics;
 
   protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf,
-      TezUmbilical tezUmbilical, String pid) {
+      TezUmbilical tezUmbilical, String pid, boolean setupSysCounterUpdater) {
     this.taskSpec = taskSpec;
     this.tezConf = tezConf;
     this.tezUmbilical = tezUmbilical;
@@ -67,7 +67,11 @@ public abstract class RuntimeTask {
     this.progress = 0.0f;
     this.taskDone = new AtomicBoolean(false);
     this.statistics = new TaskStatistics();
-    this.counterUpdater = new TaskCounterUpdater(tezCounters, tezConf, pid);
+    if (setupSysCounterUpdater) {
+      this.counterUpdater = new TaskCounterUpdater(tezCounters, tezConf, pid);
+    } else {
+      this.counterUpdater = null;
+    }
   }
 
   protected enum State {
@@ -160,7 +164,9 @@ public abstract class RuntimeTask {
   }
   
   public void setFrameworkCounters() {
-    this.counterUpdater.updateCounters();
+    if (counterUpdater != null) {
+      this.counterUpdater.updateCounters();
+    }
   }
 
   protected void setTaskDone() {

http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index edc8208..e9b48f4 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -109,6 +109,7 @@ public class TezChild {
   private final long memAvailable;
   private final AtomicBoolean isShutdown = new AtomicBoolean(false);
   private final String user;
+  private final boolean updateSysCounters;
 
   private Multimap<String, String> startedInputsMap = HashMultimap.create();
   private final boolean ownUmbilical;
@@ -123,8 +124,8 @@ public class TezChild {
       Map<String, String> serviceProviderEnvMap,
       ObjectRegistryImpl objectRegistry, String pid,
       ExecutionContext executionContext,
-      Credentials credentials, long memAvailable, String user, TezTaskUmbilicalProtocol umbilical)
-      throws IOException, InterruptedException {
+      Credentials credentials, long memAvailable, String user, TezTaskUmbilicalProtocol umbilical,
+      boolean updateSysCounters) throws IOException, InterruptedException {
     this.defaultConf = conf;
     this.containerIdString = containerIdentifier;
     this.appAttemptNumber = appAttemptNumber;
@@ -136,6 +137,7 @@ public class TezChild {
     this.credentials = credentials;
     this.memAvailable = memAvailable;
     this.user = user;
+    this.updateSysCounters = updateSysCounters;
 
     getTaskMaxSleepTime = defaultConf.getInt(
         TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX,
@@ -248,7 +250,7 @@ public class TezChild {
         TezTaskRunner2 taskRunner = new TezTaskRunner2(defaultConf, childUGI,
             localDirs, containerTask.getTaskSpec(), appAttemptNumber,
             serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, taskReporter,
-            executor, objectRegistry, pid, executionContext, memAvailable);
+            executor, objectRegistry, pid, executionContext, memAvailable, updateSysCounters);
         boolean shouldDie;
         try {
           TaskRunner2Result result = taskRunner.run();
@@ -433,7 +435,7 @@ public class TezChild {
       String tokenIdentifier, int attemptNumber, String[] localDirs, String workingDirectory,
       Map<String, String> serviceProviderEnvMap, @Nullable String pid,
       ExecutionContext executionContext, Credentials credentials, long memAvailable, String
user,
-      TezTaskUmbilicalProtocol tezUmbilical)
+      TezTaskUmbilicalProtocol tezUmbilical, boolean updateSysCounters)
       throws IOException, InterruptedException, TezException {
 
     // Pull in configuration specified for the session.
@@ -446,7 +448,7 @@ public class TezChild {
 
     return new TezChild(conf, host, port, containerIdentifier, tokenIdentifier,
         attemptNumber, workingDirectory, localDirs, serviceProviderEnvMap, objectRegistry,
pid,
-        executionContext, credentials, memAvailable, user, tezUmbilical);
+        executionContext, credentials, memAvailable, user, tezUmbilical, updateSysCounters);
   }
 
   public static void main(String[] args) throws IOException, InterruptedException, TezException
{
@@ -482,7 +484,7 @@ public class TezChild {
         tokenIdentifier, attemptNumber, localDirs, System.getenv(Environment.PWD.name()),
         System.getenv(), pid, new ExecutionContextImpl(System.getenv(Environment.NM_HOST.name())),
         credentials, Runtime.getRuntime().maxMemory(), System
-            .getenv(ApplicationConstants.Environment.USER.toString()), null);
+            .getenv(ApplicationConstants.Environment.USER.toString()), null, true);
     tezChild.run();
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
index 7fd4c75..4fdc17d 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
@@ -55,7 +55,8 @@ public class TezTaskRunner2 {
 
   private static final Logger LOG = LoggerFactory.getLogger(TezTaskRunner2.class);
 
-  private final LogicalIOProcessorRuntimeTask task;
+  @VisibleForTesting
+  final LogicalIOProcessorRuntimeTask task;
   private final UserGroupInformation ugi;
 
   private final TaskReporterInterface taskReporter;
@@ -100,7 +101,8 @@ public class TezTaskRunner2 {
                         Multimap<String, String> startedInputsMap,
                         TaskReporterInterface taskReporter, ListeningExecutorService executor,
                         ObjectRegistry objectRegistry, String pid,
-                        ExecutionContext executionContext, long memAvailable) throws
+                        ExecutionContext executionContext, long memAvailable,
+                        boolean updateSysCounters) throws
       IOException {
     this.ugi = ugi;
     this.taskReporter = taskReporter;
@@ -108,7 +110,7 @@ public class TezTaskRunner2 {
     this.umbilicalAndErrorHandler = new UmbilicalAndErrorHandler();
     this.task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, tezConf, localDirs,
         umbilicalAndErrorHandler, serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap,
-        objectRegistry, pid, executionContext, memAvailable);
+        objectRegistry, pid, executionContext, memAvailable, updateSysCounters);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
index 0acb7b8..0fc3919 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
 import java.nio.ByteBuffer;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -35,7 +34,6 @@ import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -44,7 +42,6 @@ import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.AbstractLogicalOutput;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.Reader;
@@ -85,7 +82,7 @@ public class TestLogicalIOProcessorRuntimeTask {
 
     LogicalIOProcessorRuntimeTask lio1 = new LogicalIOProcessorRuntimeTask(task1, 0, tezConf,
null,
         umbilical, serviceConsumerMetadata, new HashMap<String, String>(), startedInputsMap,
null,
-        "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory());
+        "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory(), true);
 
     try {
       lio1.initialize();
@@ -113,7 +110,7 @@ public class TestLogicalIOProcessorRuntimeTask {
     tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
     LogicalIOProcessorRuntimeTask lio2 = new LogicalIOProcessorRuntimeTask(task2, 0, tezConf,
null,
         umbilical, serviceConsumerMetadata, new HashMap<String, String>(), startedInputsMap,
null,
-        "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory());
+        "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory(), true);
     try {
       lio2.initialize();
       lio2.run();

http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
index 2123757..989753b 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
@@ -39,6 +39,7 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -49,6 +50,11 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
@@ -58,6 +64,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.ObjectRegistry;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
@@ -150,7 +157,8 @@ public class TestTaskExecution2 {
       TaskReporter taskReporter = createTaskReporter(appId, umbilical);
 
       TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
-          TestProcessor.CONF_EMPTY);
+          TestProcessor.CONF_EMPTY, true);
+      LogicalIOProcessorRuntimeTask runtimeTask = taskRunner.task;
       // Setup the executor
       Future<TaskRunner2Result> taskRunnerFuture = taskExecutor.submit(
           new TaskRunnerCallable2ForTest(taskRunner));
@@ -162,9 +170,12 @@ public class TestTaskExecution2 {
       umbilical.verifyTaskSuccessEvent();
       assertFalse(TestProcessor.wasAborted());
       umbilical.resetTrackedEvents();
+      TezCounters tezCounters = runtimeTask.getCounters();
+      verifySysCounters(tezCounters, 5, 5);
 
       taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
-          TestProcessor.CONF_EMPTY);
+          TestProcessor.CONF_EMPTY, false);
+      runtimeTask = taskRunner.task;
       // Setup the executor
       taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
       // Signal the processor to go through
@@ -174,11 +185,14 @@ public class TestTaskExecution2 {
       assertNull(taskReporter.currentCallable);
       umbilical.verifyTaskSuccessEvent();
       assertFalse(TestProcessor.wasAborted());
+      tezCounters = runtimeTask.getCounters();
+      verifySysCounters(tezCounters, -1, -1);
     } finally {
       executor.shutdownNow();
     }
   }
 
+
   // test task failed due to exception in Processor
   @Test(timeout = 5000)
   public void testFailedTaskTezException() throws IOException, InterruptedException, TezException,
@@ -231,7 +245,7 @@ public class TestTaskExecution2 {
       TaskReporter taskReporter = createTaskReporter(appId, umbilical);
 
       TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
-          "NotExitedProcessor", TestProcessor.CONF_EMPTY, false);
+          "NotExitedProcessor", TestProcessor.CONF_EMPTY, false, true);
       // Setup the executor
       Future<TaskRunner2Result> taskRunnerFuture =
           taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
@@ -484,6 +498,35 @@ public class TestTaskExecution2 {
     }
   }
 
+  private void verifySysCounters(TezCounters tezCounters, int minTaskCounterCount, int minFsCounterCount)
{
+
+    Preconditions.checkArgument((minTaskCounterCount > 0 && minFsCounterCount
> 0) ||
+        (minTaskCounterCount <= 0 && minFsCounterCount <= 0),
+        "Both targetCounter counts should be postitive or negative. A mix is not expected");
+
+    int numTaskCounters = 0;
+    int numFsCounters = 0;
+    for (CounterGroup counterGroup : tezCounters) {
+      if (counterGroup.getName().equals(TaskCounter.class.getName())) {
+        for (TezCounter ignored : counterGroup) {
+          numTaskCounters++;
+        }
+      } else if (counterGroup.getName().equals(FileSystemCounter.class.getName())) {
+        for (TezCounter ignored : counterGroup) {
+          numFsCounters++;
+        }
+      }
+    }
+
+    // If Target <=0, assert counter count is exactly 0
+    if (minTaskCounterCount <= 0) {
+      assertEquals(0, numTaskCounters);
+      assertEquals(0, numFsCounters);
+    } else {
+      assertTrue(numTaskCounters >= minTaskCounterCount);
+      assertTrue(numFsCounters >= minFsCounterCount);
+    }
+  }
 
   private void verifyTaskRunnerResult(TaskRunner2Result taskRunner2Result,
                                       EndReason expectedEndReason, Throwable expectedThrowable,
@@ -530,10 +573,20 @@ public class TestTaskExecution2 {
   private TezTaskRunner2 createTaskRunner(ApplicationId appId,
                                           TaskExecutionTestHelpers.TezTaskUmbilicalForTest
umbilical,
                                           TaskReporter taskReporter,
-                                          ListeningExecutorService executor, byte[] processorConf)
+                                          ListeningExecutorService executor, byte[] processorConf)
throws
+      IOException {
+    return createTaskRunner(appId, umbilical, taskReporter, executor, processorConf, true);
+
+  }
+
+  private TezTaskRunner2 createTaskRunner(ApplicationId appId,
+                                          TaskExecutionTestHelpers.TezTaskUmbilicalForTest
umbilical,
+                                          TaskReporter taskReporter,
+                                          ListeningExecutorService executor, byte[] processorConf,
+                                          boolean updateSysCounters)
       throws IOException {
     return createTaskRunner(appId, umbilical, taskReporter, executor, TestProcessor.class.getName(),
-        processorConf, false);
+        processorConf, false, updateSysCounters);
   }
 
   private TezTaskRunner2ForTest createTaskRunnerForTest(ApplicationId appId,
@@ -544,14 +597,15 @@ public class TestTaskExecution2 {
       throws IOException {
     return (TezTaskRunner2ForTest) createTaskRunner(appId, umbilical, taskReporter, executor,
         TestProcessor.class.getName(),
-        processorConf, true);
+        processorConf, true, true);
   }
 
   private TezTaskRunner2 createTaskRunner(ApplicationId appId,
                                           TaskExecutionTestHelpers.TezTaskUmbilicalForTest
umbilical,
                                           TaskReporter taskReporter,
                                           ListeningExecutorService executor, String processorClass,
-                                          byte[] processorConf, boolean testRunner) throws
+                                          byte[] processorConf, boolean testRunner,
+                                          boolean updateSysCounters) throws
       IOException {
     TezConfiguration tezConf = new TezConfiguration(defaultConf);
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
@@ -574,13 +628,13 @@ public class TestTaskExecution2 {
           new HashMap<String, ByteBuffer>(), new HashMap<String, String>(),
           HashMultimap.<String, String>create(), taskReporter,
           executor, null, "", new ExecutionContextImpl("localhost"),
-          Runtime.getRuntime().maxMemory());
+          Runtime.getRuntime().maxMemory(), updateSysCounters);
     } else {
       taskRunner = new TezTaskRunner2(tezConf, ugi, localDirs, taskSpec, 1,
           new HashMap<String, ByteBuffer>(), new HashMap<String, String>(),
           HashMultimap.<String, String>create(), taskReporter,
           executor, null, "", new ExecutionContextImpl("localhost"),
-          Runtime.getRuntime().maxMemory());
+          Runtime.getRuntime().maxMemory(), updateSysCounters);
     }
 
     return taskRunner;
@@ -604,10 +658,11 @@ public class TestTaskExecution2 {
                                  ObjectRegistry objectRegistry,
                                  String pid,
                                  ExecutionContext executionContext,
-                                 long memAvailable) throws IOException {
+                                 long memAvailable,
+                                 boolean updateSysCounters) throws IOException {
       super(tezConf, ugi, localDirs, taskSpec, appAttemptNumber, serviceConsumerMetadata,
           serviceProviderEnvMap, startedInputsMap, taskReporter, executor, objectRegistry,
pid,
-          executionContext, memAvailable);
+          executionContext, memAvailable, updateSysCounters);
     }
 
 


Mime
View raw message