tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject [02/27] tez git commit: TEZ-14. Support MR like speculation capabilities based on latency deviation from the mean (bikas)
Date Wed, 10 Dec 2014 03:30:12 GMT
http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index c98e3de..6796d02 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.dag.api.TaskLocationHint;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ClusterInfo;
@@ -69,6 +70,7 @@ import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminating;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
@@ -76,8 +78,11 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
+import org.apache.tez.dag.app.dag.event.TaskEventType;
+import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptStatusUpdate;
 import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
 import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
 import org.apache.tez.dag.app.rm.container.AMContainerMap;
@@ -87,6 +92,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
@@ -538,7 +544,191 @@ public class TestTaskAttempt {
     // captured - TA already succeeded.
     assertEquals(0, taImpl.getDiagnostics().size());
   }
+  
+  @Test//(timeout = 5000)
+  public void testFailure() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+        appId, 0);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+    TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+    TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0);
+
+    MockEventHandler eventHandler = spy(new MockEventHandler());
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    Configuration taskConf = new Configuration();
+    taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    taskConf.setBoolean("fs.file.impl.disable.cache", true);
+    taskConf.setBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, true);
+
+    TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(
+        new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
+    Resource resource = Resource.newInstance(1024, 1);
+
+    NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    AppContext appCtx = mock(AppContext.class);
+    AMContainerMap containers = new AMContainerMap(
+        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        new ContainerContextMatcher(), appCtx);
+    containers.addContainerIfNew(container);
+
+    doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+    doReturn(containers).when(appCtx).getAllContainers();
+
+    TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+        taListener, taskConf, new SystemClock(),
+        mock(TaskHeartbeatHandler.class), appCtx, locationHint, false,
+        resource, createFakeContainerContext(), false);
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+
+    taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
+    // At state STARTING.
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
+        null));
+    assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(),
+        TaskAttemptState.RUNNING);
+
+    int expectedEventsAtRunning = 4;
+    verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
+    verifyEventType(
+        arg.getAllValues().subList(0,
+            expectedEventsAtRunning), VertexEventTaskAttemptStatusUpdate.class, 1);
+    
+    taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null,
0.1f)));
+    
+    taImpl.handle(new TaskAttemptEventAttemptFailed(taskAttemptID, TaskAttemptEventType.TA_FAILED,
"0"));
+
+    assertEquals("Task attempt is not in the  FAIL_IN_PROGRESS state", taImpl.getInternalState(),
+        TaskAttemptStateInternal.FAIL_IN_PROGRESS);
+
+    assertEquals(1, taImpl.getDiagnostics().size());
+    assertEquals("0", taImpl.getDiagnostics().get(0));
+    
+    taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, "1"));
+
+    assertEquals(2, taImpl.getDiagnostics().size());
+    assertEquals("1", taImpl.getDiagnostics().get(1));
+
+    int expectedEvenstAfterTerminating = expectedEventsAtRunning + 5;
+    arg = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture());
+
+
+    Event e = verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtRunning,
+            expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1);
+    assertEquals(TaskEventType.T_ATTEMPT_FAILED, e.getType());
+    verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtRunning,
+            expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1);
+    verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtRunning,
+            expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
+    verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtRunning,
+            expectedEvenstAfterTerminating), VertexEventTaskAttemptStatusUpdate.class, 2);
+  }
+  
+  @Test//(timeout = 5000)
+  public void testSuccess() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+        appId, 0);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+    TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+    TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0);
+
+    MockEventHandler eventHandler = spy(new MockEventHandler());
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    Configuration taskConf = new Configuration();
+    taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    taskConf.setBoolean("fs.file.impl.disable.cache", true);
+    taskConf.setBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, true);
+
+    TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(
+        new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
+    Resource resource = Resource.newInstance(1024, 1);
+
+    NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    AppContext appCtx = mock(AppContext.class);
+    AMContainerMap containers = new AMContainerMap(
+        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        new ContainerContextMatcher(), appCtx);
+    containers.addContainerIfNew(container);
+
+    doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+    doReturn(containers).when(appCtx).getAllContainers();
+
+    TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+        taListener, taskConf, new SystemClock(),
+        mock(TaskHeartbeatHandler.class), appCtx, locationHint, false,
+        resource, createFakeContainerContext(), false);
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+
+    taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
+    // At state STARTING.
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
+        null));
+    assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(),
+        TaskAttemptState.RUNNING);
+
+    int expectedEventsAtRunning = 4;
+    verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
+    verifyEventType(
+        arg.getAllValues().subList(0,
+            expectedEventsAtRunning), VertexEventTaskAttemptStatusUpdate.class, 1);
+    
+    taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null,
0.1f)));
+    
+    taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE));
 
+    assertEquals("Task attempt is not in the  SUCCEEDED state", taImpl.getState(),
+        TaskAttemptState.SUCCEEDED);
+
+    assertEquals(0, taImpl.getDiagnostics().size());
+
+    int expectedEvenstAfterTerminating = expectedEventsAtRunning + 5;
+    arg = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture());
+
+
+    Event e = verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtRunning,
+            expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1);
+    assertEquals(TaskEventType.T_ATTEMPT_SUCCEEDED, e.getType());
+    verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtRunning,
+            expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1);
+    verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtRunning,
+            expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
+    verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtRunning,
+            expectedEvenstAfterTerminating), VertexEventTaskAttemptStatusUpdate.class, 2);
+  }
+  
   @Test(timeout = 5000)
   // Ensure Container Preemption race with task completion is handled correctly by
   // the TaskAttempt
@@ -921,17 +1111,20 @@ public class TestTaskAttempt {
         arg.capture());
   }
 
-  private void verifyEventType(List<Event> events,
+  private Event verifyEventType(List<Event> events,
       Class<? extends Event> eventClass, int expectedOccurences) {
     int count = 0;
+    Event ret = null;
     for (Event e : events) {
       if (eventClass.isInstance(e)) {
         count++;
+        ret = e;
       }
     }
     assertEquals(
         "Mismatch in num occurences of event: " + eventClass.getCanonicalName(),
         expectedOccurences, count);
+    return ret;
   }
 
   public static class MockEventHandler implements EventHandler {

http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/test/java/org/apache/tez/dag/app/dag/speculation/legacy/TestDataStatistics.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/speculation/legacy/TestDataStatistics.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/speculation/legacy/TestDataStatistics.java
new file mode 100644
index 0000000..fe09594
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/speculation/legacy/TestDataStatistics.java
@@ -0,0 +1,73 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.speculation.legacy;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestDataStatistics {
+
+  private static final double TOL = 0.001;
+
+  @Test
+  public void testEmptyDataStatistics() throws Exception {
+    DataStatistics statistics = new DataStatistics();
+    Assert.assertEquals(0, statistics.count(), TOL);
+    Assert.assertEquals(Long.MAX_VALUE, statistics.mean(), TOL);
+    Assert.assertEquals(0, statistics.var(), TOL);
+    Assert.assertEquals(0, statistics.std(), TOL);
+    Assert.assertEquals(Long.MAX_VALUE, statistics.outlier(1.0f), TOL);
+  }
+  
+  @Test
+  public void testSingleEntryDataStatistics() throws Exception {
+    DataStatistics statistics = new DataStatistics(17.29);
+    Assert.assertEquals(1, statistics.count(), TOL);
+    Assert.assertEquals(17.29, statistics.mean(), TOL);
+    Assert.assertEquals(0, statistics.var(), TOL);
+    Assert.assertEquals(0, statistics.std(), TOL);
+    Assert.assertEquals(17.29, statistics.outlier(1.0f), TOL);
+  }
+  
+  @Test
+  public void testMutiEntryDataStatistics() throws Exception {
+    DataStatistics statistics = new DataStatistics();
+    statistics.add(17);
+    statistics.add(29);
+    Assert.assertEquals(2, statistics.count(), TOL);
+    Assert.assertEquals(23.0, statistics.mean(), TOL);
+    Assert.assertEquals(36.0, statistics.var(), TOL);
+    Assert.assertEquals(6.0, statistics.std(), TOL);
+    Assert.assertEquals(29.0, statistics.outlier(1.0f), TOL);
+ }
+  
+  @Test
+  public void testUpdateStatistics() throws Exception {
+    DataStatistics statistics = new DataStatistics(17);
+    statistics.add(29);
+    Assert.assertEquals(2, statistics.count(), TOL);
+    Assert.assertEquals(23.0, statistics.mean(), TOL);
+    Assert.assertEquals(36.0, statistics.var(), TOL);
+
+    statistics.updateStatistics(17, 29);
+    Assert.assertEquals(2, statistics.count(), TOL);
+    Assert.assertEquals(29.0, statistics.mean(), TOL);
+    Assert.assertEquals(0.0, statistics.var(), TOL);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
index 5db17c3..35e6dcd 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
@@ -132,7 +132,7 @@ public class TestDAGRecovery {
 
     tezConf = new TezConfiguration(miniTezCluster.getConfig());
     tezConf.setInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, 0);
-    tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "DEBUG");
+    tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "INFO");
     tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
         remoteStagingDir.toString());
     tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);

http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
index 58b9413..ab2fef8 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
@@ -121,6 +121,7 @@ public class MultiAttemptDAG {
         String payload = new String(getContext().getUserPayload().deepCopyAsArray());
         int successAttemptId = Integer.valueOf(payload);
         LOG.info("Checking whether to crash AM or schedule tasks"
+            + ", vertex: " + getContext().getVertexName()
             + ", successfulAttemptID=" + successAttemptId
             + ", currentAttempt=" + getContext().getDAGAttemptNumber());
         if (successAttemptId > getContext().getDAGAttemptNumber()) {


Mime
View raw message