tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject tez git commit: TEZ-3549. TaskAttemptImpl does not initialize TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS correctly (Kuhu Shukla via jeagles)
Date Thu, 08 Dec 2016 16:53:10 GMT
Repository: tez
Updated Branches:
  refs/heads/master 125f8c023 -> 2b42ac8a0


TEZ-3549. TaskAttemptImpl does not initialize TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS correctly
(Kuhu Shukla via jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2b42ac8a
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2b42ac8a
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2b42ac8a

Branch: refs/heads/master
Commit: 2b42ac8a0836b9501e5acc03ecf85d7f3dd4cc92
Parents: 125f8c0
Author: Jonathan Eagles <jeagles@yahoo-inc.com>
Authored: Thu Dec 8 10:52:46 2016 -0600
Committer: Jonathan Eagles <jeagles@yahoo-inc.com>
Committed: Thu Dec 8 10:52:46 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  2 +
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   | 76 ++++++++++++++++++++
 .../org/apache/tez/test/TestFaultTolerance.java |  3 +-
 4 files changed, 83 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/2b42ac8a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a7299b7..2dd9b0a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3549. TaskAttemptImpl does not initialize TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS correctly
   TEZ-3552. Shuffle split array when size-based sorting is turned off.
   TEZ-3537. ArrayIndexOutOfBoundsException with empty environment variables/Port YARN-3768
to Tez
   TEZ-3271. Provide mapreduce failures.maxpercent equivalent.
@@ -156,6 +157,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3549. TaskAttemptImpl does not initialize TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS correctly
   TEZ-3537. ArrayIndexOutOfBoundsException with empty environment variables/Port YARN-3768
to Tez
   TEZ-3536. NPE in WebUIService start when host resolution fails.
   TEZ-3534. Differentiate thread names on Fetchers, minor changes to shuffle shutdown code.
@@ -665,6 +667,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3549. TaskAttemptImpl does not initialize TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS correctly
   TEZ-3537. ArrayIndexOutOfBoundsException with empty environment variables/Port YARN-3768
to Tez
   TEZ-3536. NPE in WebUIService start when host resolution fails.
   TEZ-3493. DAG submit timeout cannot be set to a month

http://git-wip-us.apache.org/repos/asf/tez/blob/2b42ac8a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index e5f3e71..8a81575 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -509,6 +509,8 @@ public class TaskAttemptImpl implements TaskAttempt,
     this.taskSpec = taskSpec;
     this.creationCausalTA = schedulingCausalTA;
     this.creationTime = clock.getTime();
+    //set last notified progress time to current time
+    this.lastNotifyProgressTimestamp = clock.getTime();
 
     this.reportedStatus = new TaskAttemptStatus(this.attemptId);
     initTaskAttemptStatus(reportedStatus);

http://git-wip-us.apache.org/repos/asf/tez/blob/2b42ac8a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index a50ca49..44d8213 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -943,6 +943,82 @@ public class TestTaskAttempt {
             expectedEvenstAfterTerminating), SpeculatorEventTaskAttemptStatusUpdate.class,
2);
   }
 
+  @Test
+  public void testProgressTimeStampUpdate() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+        appId, 0);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+    TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+
+    MockEventHandler eventHandler = spy(new MockEventHandler());
+    TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
+
+    Configuration taskConf = new Configuration();
+    taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    taskConf.setBoolean("fs.file.impl.disable.cache", true);
+    taskConf.setLong(TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS, 75);
+
+    locationHint = TaskLocationHint.createTaskLocationHint(
+        new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
+    Resource resource = Resource.newInstance(1024, 1);
+
+    NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    @SuppressWarnings("deprecation")
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    AMContainerMap containers = new AMContainerMap(
+        mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
+        new ContainerContextMatcher(), appCtx);
+    containers.addContainerIfNew(container, 0, 0, 0);
+
+    doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+    doReturn(containers).when(appCtx).getAllContainers();
+
+    TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
+    Clock mockClock = mock(Clock.class);
+    when(mockClock.getTime()).thenReturn(50l);
+    TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+        taListener, taskConf, mockClock,
+        mockHeartbeatHandler, appCtx, false,
+        resource, createFakeContainerContext(), false);
+    TezTaskAttemptID taskAttemptID = taImpl.getID();
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+
+    taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
+    // At state STARTING.
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
+        null));
+    assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(),
+        TaskAttemptState.RUNNING);
+    verify(mockHeartbeatHandler).register(taskAttemptID);
+
+    when(mockClock.getTime()).thenReturn(100l);
+    taImpl.handle(new TaskAttemptEventStatusUpdate(
+        taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false)));
+    verify(eventHandler, atLeast(1)).handle(arg.capture());
+    if (arg.getValue() instanceof TaskAttemptEventAttemptFailed) {
+      TaskAttemptEventAttemptFailed fEvent = (TaskAttemptEventAttemptFailed) arg.getValue();
+      assertEquals(taImpl.getID(), fEvent.getTaskAttemptID());
+      assertEquals(TaskAttemptTerminationCause.NO_PROGRESS, fEvent.getTerminationCause());
+      taImpl.handle(fEvent);
+      fail("Should not fail since the timestamps do not differ by progress interval config");
+    } else {
+      Assert.assertEquals("Task Attempt's internal state should be RUNNING!",
+          taImpl.getInternalState(), TaskAttemptStateInternal.RUNNING);
+    }
+    when(mockClock.getTime()).thenReturn(200l);
+    taImpl.handle(new TaskAttemptEventStatusUpdate(
+        taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false)));
+    verify(eventHandler, atLeast(1)).handle(arg.capture());
+    Assert.assertTrue("This should have been an attempt failed event!", arg.getValue() instanceof
TaskAttemptEventAttemptFailed);
+  }
+
   @Test (timeout = 5000)
   public void testNoProgressFail() throws Exception {
     ApplicationId appId = ApplicationId.newInstance(1, 2);

http://git-wip-us.apache.org/repos/asf/tez/blob/2b42ac8a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
index 764ef0f..b2a5d17 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
@@ -777,8 +777,9 @@ public class TestFaultTolerance {
   @Test (timeout=240000)
   public void testNoProgress() throws Exception {
     Configuration testConf = new Configuration(false);
-    testConf.setInt(TestProcessor.TEZ_FAILING_PROCESSOR_SLEEP_MS, 1000*100); // long sleep
     testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1);
+    testConf.setLong(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_FAILING_PROCESSOR_SLEEP_MS, "v1"), 1000*100); // long sleep
     DAG dag = SimpleTestDAG.createDAG(testConf);
     Vertex hung = dag.getVertex("v1");
     hung.setConf(TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS, Long.toString(1000));


Mime
View raw message