tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject tez git commit: TEZ-2265. All inputs/outputs in a task share the same counter object (bikas)
Date Fri, 03 Apr 2015 01:18:51 GMT
Repository: tez
Updated Branches:
  refs/heads/master 212de07db -> 09a96088c


TEZ-2265. All inputs/outputs in a task share the same counter object (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/09a96088
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/09a96088
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/09a96088

Branch: refs/heads/master
Commit: 09a96088c94008e68fdbbffe0045d548fa2c57f5
Parents: 212de07
Author: Bikas Saha <bikas@apache.org>
Authored: Thu Apr 2 18:15:12 2015 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Thu Apr 2 18:16:39 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                       |  1 +
 .../runtime/LogicalIOProcessorRuntimeTask.java    | 10 ++++++----
 .../java/org/apache/tez/runtime/RuntimeTask.java  | 18 +++++++++++++++++-
 .../tez/runtime/api/impl/TezInputContextImpl.java |  7 ++++---
 .../runtime/api/impl/TezOutputContextImpl.java    |  7 ++++---
 .../runtime/api/impl/TezProcessorContextImpl.java |  4 ++--
 .../output/TestOnFileUnorderedKVOutput.java       |  5 ++++-
 .../java/org/apache/tez/test/TestAMRecovery.java  |  6 ++++++
 .../test/java/org/apache/tez/test/TestInput.java  |  3 +++
 .../test/java/org/apache/tez/test/TestOutput.java |  2 ++
 10 files changed, 49 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/09a96088/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e844f60..a669147 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES
   TEZ-2176. Move all logging to slf4j. (commons-logging jar no longer part of Tez tar)
 
 ALL CHANGES:
+  TEZ-2265. All inputs/outputs in a task share the same counter object
   TEZ-2251. Race condition in VertexImpl & Edge causes DAG to hang.
   TEZ-2264. Remove unused taskUmbilical reference in TezTaskRunner, register as running late.
   TEZ-2149. Optimizations for the timed version of DAGClient.getStatus.

http://git-wip-us.apache.org/repos/asf/tez/blob/09a96088/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 2d27c8c..56b2627 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.RunnableWithNdc;
+import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -85,6 +86,7 @@ import org.apache.tez.runtime.common.resources.MemoryDistributor;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -488,7 +490,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         inputSpec.getSourceVertexName(),
         taskSpec.getVertexParallelism(),
         taskSpec.getTaskAttemptID(),
-        tezCounters, inputIndex,
+        inputIndex,
         inputSpec.getInputDescriptor().getUserPayload(), this,
         serviceConsumerMetadata, envMap, initialMemoryDistributor,
         inputSpec.getInputDescriptor(), inputMap, inputReadyTracker, objectRegistry,
@@ -503,7 +505,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         outputSpec.getDestinationVertexName(),
         taskSpec.getVertexParallelism(),
         taskSpec.getTaskAttemptID(),
-        tezCounters, outputIndex,
+        outputIndex,
         outputSpec.getOutputDescriptor().getUserPayload(), this,
         serviceConsumerMetadata, envMap, initialMemoryDistributor,
         outputSpec.getOutputDescriptor(), objectRegistry, ExecutionContext,
@@ -517,7 +519,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         taskSpec.getDAGName(), taskSpec.getVertexName(),
         taskSpec.getVertexParallelism(),
         taskSpec.getTaskAttemptID(),
-        tezCounters, processorDescriptor.getUserPayload(), this,
+        processorDescriptor.getUserPayload(), this,
         serviceConsumerMetadata, envMap, initialMemoryDistributor,
         processorDescriptor, inputReadyTracker, objectRegistry, ExecutionContext, memAvailable);
     return processorContext;
@@ -694,7 +696,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   }
 
   public synchronized void cleanup() {
-    LOG.info("Final Counters : " + tezCounters.toShortString());
+    LOG.info("Final Counters : " + getCounters().toShortString());
     setTaskDone();
     if (eventRouterThread != null) {
       eventRouterThread.interrupt();

http://git-wip-us.apache.org/repos/asf/tez/blob/09a96088/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
index 4dfa936..4777b71 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
@@ -19,6 +19,7 @@
 package org.apache.tez.runtime;
 
 import java.util.Collection;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -31,6 +32,8 @@ import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.TezUmbilical;
 import org.apache.tez.runtime.metrics.TaskCounterUpdater;
 
+import com.google.common.collect.Maps;
+
 public abstract class RuntimeTask {
 
   protected AtomicBoolean hasFatalError = new AtomicBoolean(false);
@@ -38,6 +41,8 @@ public abstract class RuntimeTask {
   protected String fatalErrorMessage = null;
   protected float progress;
   protected final TezCounters tezCounters;
+  private final Map<String, TezCounters> counterMap = Maps.newConcurrentMap();
+  
   protected final TaskSpec taskSpec;
   protected final Configuration tezConf;
   protected final TezUmbilical tezUmbilical;
@@ -63,6 +68,12 @@ public abstract class RuntimeTask {
 
   protected final AtomicReference<State> state = new AtomicReference<State>();
 
+  public TezCounters addAndGetTezCounter(String name) {
+    TezCounters counter = new TezCounters();
+    counterMap.put(name, counter);
+    return counter;
+  }
+  
   public String getVertexName() {
     return taskSpec.getVertexName();
   }
@@ -90,7 +101,12 @@ public abstract class RuntimeTask {
   }
 
   public TezCounters getCounters() {
-    return this.tezCounters;
+    TezCounters fullCounters = new TezCounters();
+    fullCounters.incrAllCounters(tezCounters);
+    for (TezCounters counter : counterMap.values()) {
+      fullCounters.incrAllCounters(counter);
+    }
+    return fullCounters;
   }
 
   public TezTaskAttemptID getTaskAttemptID() {

http://git-wip-us.apache.org/repos/asf/tez/blob/09a96088/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
index a15e072..bd41aed 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -62,7 +62,7 @@ public class TezInputContextImpl extends TezTaskContextImpl
                              TezUmbilical tezUmbilical, String dagName, 
                              String taskVertexName, String sourceVertexName,
                              int vertexParallelism, TezTaskAttemptID taskAttemptID,
-                             TezCounters counters, int inputIndex, @Nullable UserPayload
userPayload,
+                             int inputIndex, @Nullable UserPayload userPayload,
                              RuntimeTask runtimeTask,
                              Map<String, ByteBuffer> serviceConsumerMetadata,
                              Map<String, String> auxServiceEnv, MemoryDistributor memDist,
@@ -70,7 +70,7 @@ public class TezInputContextImpl extends TezTaskContextImpl
                              InputReadyTracker inputReadyTracker, ObjectRegistry objectRegistry,
                              ExecutionContext ExecutionContext, long memAvailable) {
     super(conf, workDirs, appAttemptNumber, dagName, taskVertexName,
-        vertexParallelism, taskAttemptID, wrapCounters(counters,
+        vertexParallelism, taskAttemptID, wrapCounters(runtimeTask,
         taskVertexName, sourceVertexName, conf), runtimeTask, tezUmbilical,
         serviceConsumerMetadata, auxServiceEnv, memDist, inputDescriptor,
         objectRegistry, ExecutionContext, memAvailable);
@@ -88,8 +88,9 @@ public class TezInputContextImpl extends TezTaskContextImpl
     this.inputReadyTracker = inputReadyTracker;
   }
 
-  private static TezCounters wrapCounters(TezCounters tezCounters, String taskVertexName,
+  private static TezCounters wrapCounters(RuntimeTask task, String taskVertexName,
       String edgeVertexName, Configuration conf) {
+    TezCounters tezCounters = task.addAndGetTezCounter(edgeVertexName);
     if (conf.getBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO,
         TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO_DEFAULT)) {
       return new TezCountersDelegate(tezCounters, taskVertexName, edgeVertexName, "INPUT");

http://git-wip-us.apache.org/repos/asf/tez/blob/09a96088/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
index d376b88..8d758f0 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -58,7 +58,7 @@ public class TezOutputContextImpl extends TezTaskContextImpl
       String taskVertexName,
       String destinationVertexName,
       int vertexParallelism,
-      TezTaskAttemptID taskAttemptID, TezCounters counters, int outputIndex,
+      TezTaskAttemptID taskAttemptID, int outputIndex,
       @Nullable UserPayload userPayload, RuntimeTask runtimeTask,
       Map<String, ByteBuffer> serviceConsumerMetadata,
       Map<String, String> auxServiceEnv, MemoryDistributor memDist,
@@ -66,7 +66,7 @@ public class TezOutputContextImpl extends TezTaskContextImpl
       ExecutionContext ExecutionContext, long memAvailable) {
     super(conf, workDirs, appAttemptNumber, dagName, taskVertexName, 
         vertexParallelism, taskAttemptID,
-        wrapCounters(counters, taskVertexName, destinationVertexName, conf),
+        wrapCounters(runtimeTask, taskVertexName, destinationVertexName, conf),
         runtimeTask, tezUmbilical, serviceConsumerMetadata,
         auxServiceEnv, memDist, outputDescriptor, objectRegistry, ExecutionContext, memAvailable);
     checkNotNull(outputIndex, "outputIndex is null");
@@ -78,8 +78,9 @@ public class TezOutputContextImpl extends TezTaskContextImpl
         taskVertexName, destinationVertexName, taskAttemptID);
   }
 
-  private static TezCounters wrapCounters(TezCounters tezCounters, String taskVertexName,
+  private static TezCounters wrapCounters(RuntimeTask runtimeTask, String taskVertexName,
       String edgeVertexName, Configuration conf) {
+    TezCounters tezCounters = runtimeTask.addAndGetTezCounter(edgeVertexName);
     if (conf.getBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO,
         TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO_DEFAULT)) {
       return new TezCountersDelegate(tezCounters, taskVertexName, edgeVertexName, "OUTPUT");

http://git-wip-us.apache.org/repos/asf/tez/blob/09a96088/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index 16f9a45..edfd8c9 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -54,14 +54,14 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements
Proce
 
   public TezProcessorContextImpl(Configuration conf, String[] workDirs, int appAttemptNumber,
       TezUmbilical tezUmbilical, String dagName, String vertexName,
-      int vertexParallelism, TezTaskAttemptID taskAttemptID, TezCounters counters,
+      int vertexParallelism, TezTaskAttemptID taskAttemptID,
       @Nullable UserPayload userPayload, RuntimeTask runtimeTask,
       Map<String, ByteBuffer> serviceConsumerMetadata,
       Map<String, String> auxServiceEnv, MemoryDistributor memDist,
       ProcessorDescriptor processorDescriptor, InputReadyTracker inputReadyTracker, ObjectRegistry
objectRegistry,
       ExecutionContext ExecutionContext, long memAvailable) {
     super(conf, workDirs, appAttemptNumber, dagName, vertexName, vertexParallelism, taskAttemptID,
-        counters, runtimeTask, tezUmbilical, serviceConsumerMetadata,
+        runtimeTask.addAndGetTezCounter(vertexName), runtimeTask, tezUmbilical, serviceConsumerMetadata,
         auxServiceEnv, memDist, processorDescriptor, objectRegistry, ExecutionContext, memAvailable);
     checkNotNull(inputReadyTracker, "inputReadyTracker is null");
     this.userPayload = userPayload;

http://git-wip-us.apache.org/repos/asf/tez/blob/09a96088/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
index 509c23d..3e0f6ea 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.junit.Assert.assertEquals;
@@ -205,6 +206,7 @@ public class TestOnFileUnorderedKVOutput {
     TezCounters counters = new TezCounters();
     UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
     RuntimeTask runtimeTask = mock(RuntimeTask.class);
+    when(runtimeTask.addAndGetTezCounter(destinationVertexName)).thenReturn(counters);
 
 
     Map<String, String> auxEnv = new HashMap<String, String>();
@@ -219,9 +221,10 @@ public class TestOnFileUnorderedKVOutput {
 
     OutputContext realOutputContext = new TezOutputContextImpl(conf, new String[] {workDir.toString()},
         appAttemptNumber, tezUmbilical, dagName, taskVertexName, destinationVertexName,
-        -1, taskAttemptID, counters, 0, userPayload, runtimeTask,
+        -1, taskAttemptID, 0, userPayload, runtimeTask,
         null, auxEnv, new MemoryDistributor(1, 1, conf) , outputDescriptor, null,
         new ExecutionContextImpl("localhost"), 2048);
+    verify(runtimeTask, times(1)).addAndGetTezCounter(destinationVertexName);
     OutputContext outputContext = spy(realOutputContext);
     doAnswer(new Answer() {
       @Override public Object answer(InvocationOnMock invocation) throws Throwable {

http://git-wip-us.apache.org/repos/asf/tez/blob/09a96088/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
index 66d8373..9e0c02f 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
@@ -40,6 +40,7 @@ import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.DAGCounter;
+import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
@@ -363,6 +364,11 @@ public class TestAMRecovery {
     TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
     assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
     assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
+    TezCounter outputCounter = counters.findCounter(TestOutput.COUNTER_NAME, TestOutput.COUNTER_NAME);
+    TezCounter inputCounter = counters.findCounter(TestInput.COUNTER_NAME, TestInput.COUNTER_NAME);
+    // verify that processor, input and output counters, are all being collected
+    Assert.assertTrue(outputCounter.getValue() > 0);
+    Assert.assertTrue(inputCounter.getValue() > 0);
 
     List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
     List<HistoryEvent> historyEvents2 = readRecoveryLog(2);

http://git-wip-us.apache.org/repos/asf/tez/blob/09a96088/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index 68ac8e0..eeb565c 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -57,6 +57,8 @@ public class TestInput extends AbstractLogicalInput {
   private static final Logger LOG = LoggerFactory
       .getLogger(TestInput.class);
   
+  public static final String COUNTER_NAME = "TestInput";
+
   Configuration conf;
   int numCompletedInputs = 0;
   int[] completedInputVersion;
@@ -369,6 +371,7 @@ public class TestInput extends AbstractLogicalInput {
 
   @Override
   public List<Event> close() throws Exception {
+    getContext().getCounters().findCounter(COUNTER_NAME, COUNTER_NAME).increment(1);;
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/09a96088/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
index 737f8e7..69dea7e 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
@@ -37,6 +37,7 @@ import com.google.common.collect.Lists;
 public class TestOutput extends AbstractLogicalOutput {
   private static final Logger LOG = LoggerFactory.getLogger(TestOutput.class);
 
+  public static final String COUNTER_NAME = "TestOutput";
   public TestOutput(OutputContext outputContext, int numPhysicalOutputs) {
     super(outputContext, numPhysicalOutputs);
   }
@@ -77,6 +78,7 @@ public class TestOutput extends AbstractLogicalOutput {
   @Override
   public List<Event> close() throws Exception {
     LOG.info("Sending data movement event with value: " + output);
+    getContext().getCounters().findCounter(COUNTER_NAME, COUNTER_NAME).increment(1);;
     ByteBuffer result = ByteBuffer.allocate(4).putInt(output);
     result.flip();
     List<Event> events = Lists.newArrayListWithCapacity(getNumPhysicalOutputs());


Mime
View raw message