tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject [1/2] tez git commit: TEZ-2234. Add API for statistics information - allow vertex managers to get output size per source vertex (bikas)
Date Sat, 11 Apr 2015 05:52:54 GMT
Repository: tez
Updated Branches:
  refs/heads/master c8ef2442d -> bd9b8d951


http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
index 3e0f6ea..41eb9a4 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
@@ -38,6 +38,7 @@ import java.util.List;
 import java.util.Map;
 
 import com.google.protobuf.ByteString;
+
 import org.apache.commons.lang.RandomStringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,12 +56,13 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.runtime.RuntimeTask;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
+import org.apache.tez.runtime.api.impl.TaskStatistics;
 import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
 import org.apache.tez.runtime.api.impl.TezUmbilical;
 import org.apache.tez.runtime.common.resources.MemoryDistributor;
@@ -72,6 +74,7 @@ import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovem
 import org.apache.tez.runtime.library.testutils.KVDataGen;
 import org.apache.tez.runtime.library.testutils.KVDataGen.KVPair;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -87,6 +90,8 @@ public class TestOnFileUnorderedKVOutput {
   private static Path workDir = null;
   private static final int shufflePort = 2112;
 
+  TaskStatistics stats;
+
   static {
     defaultConf.set("fs.defaultFS", "file:///");
     try {
@@ -102,6 +107,7 @@ public class TestOnFileUnorderedKVOutput {
 
   @Before
   public void setup() throws Exception {
+    stats = new TaskStatistics();
     localFs.mkdirs(workDir);
   }
 
@@ -126,12 +132,13 @@ public class TestOnFileUnorderedKVOutput {
     assertTrue(events != null && events.size() == 0);
 
     KeyValueWriter kvWriter = kvOutput.getWriter();
-    List<KVPair> data = KVDataGen.generateTestData(true);
+    List<KVPair> data = KVDataGen.generateTestData(true, 0);
     for (KVPair kvp : data) {
       kvWriter.write(kvp.getKey(), kvp.getvalue());
     }
 
     events = kvOutput.close();
+    assertEquals(45, stats.getIOStatistics().values().iterator().next().getDataSize());
     assertTrue(events != null && events.size() == 1);
     CompositeDataMovementEvent dmEvent = (CompositeDataMovementEvent)events.get(0);
 
@@ -205,9 +212,9 @@ public class TestOnFileUnorderedKVOutput {
     TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 1);
     TezCounters counters = new TezCounters();
     UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
-    RuntimeTask runtimeTask = mock(RuntimeTask.class);
+    LogicalIOProcessorRuntimeTask runtimeTask = mock(LogicalIOProcessorRuntimeTask.class);
     when(runtimeTask.addAndGetTezCounter(destinationVertexName)).thenReturn(counters);
-
+    when(runtimeTask.getTaskStatistics()).thenReturn(stats);
 
     Map<String, String> auxEnv = new HashMap<String, String>();
     ByteBuffer bb = ByteBuffer.allocate(4);
@@ -225,6 +232,9 @@ public class TestOnFileUnorderedKVOutput {
         null, auxEnv, new MemoryDistributor(1, 1, conf) , outputDescriptor, null,
         new ExecutionContextImpl("localhost"), 2048);
     verify(runtimeTask, times(1)).addAndGetTezCounter(destinationVertexName);
+    verify(runtimeTask, times(1)).getTaskStatistics();
+    // verify output stats object got created
+    Assert.assertTrue(stats.getIOStatistics().containsKey(destinationVertexName));
     OutputContext outputContext = spy(realOutputContext);
     doAnswer(new Answer() {
       @Override public Object answer(InvocationOnMock invocation) throws Throwable {


Mime
View raw message