Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 75A65172D5 for ; Thu, 5 Feb 2015 21:00:46 +0000 (UTC) Received: (qmail 91966 invoked by uid 500); 5 Feb 2015 21:00:46 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 91847 invoked by uid 500); 5 Feb 2015 21:00:46 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 91710 invoked by uid 99); 5 Feb 2015 21:00:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Feb 2015 21:00:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2C582E040E; Thu, 5 Feb 2015 21:00:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Date: Thu, 05 Feb 2015 21:00:45 -0000 Message-Id: <95f9600993c243ebbcba5227a05620b4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/8] tez git commit: TEZ-1929. pre-empted tasks should be marked as killed instead of failed (bikas) Repository: tez Updated Branches: refs/heads/TEZ-2003 753b7efde -> f49c054c9 (forced update) TEZ-1929. pre-empted tasks should be marked as killed instead of failed (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6ba1339d Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6ba1339d Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6ba1339d Branch: refs/heads/TEZ-2003 Commit: 6ba1339d57a5d05fa14f35f352076131bffea483 Parents: 21bce95 Author: Bikas Saha Authored: Fri Jan 30 16:40:11 2015 -0800 Committer: Bikas Saha Committed: Fri Jan 30 16:40:11 2015 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../rm/container/AMContainerEventCompleted.java | 5 +-- .../dag/app/rm/container/AMContainerImpl.java | 4 +-- .../tez/dag/app/TestMockDAGAppMaster.java | 38 +++++++++++++++++++- .../app/rm/TestTaskSchedulerEventHandler.java | 2 +- .../dag/app/rm/container/TestAMContainer.java | 6 ++-- 6 files changed, 47 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/6ba1339d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3414adc..43d009d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -55,6 +55,7 @@ Release 0.6.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-1929. pre-empted tasks should be marked as killed instead of failed TEZ-2017. TEZ UI - Dag view throwing error whild re-displaying additionals in some dags. TEZ-2013. TEZ UI - App Details Page UI Nits TEZ-2014. Tez UI: Nits : All tables, Vertices Page UI. http://git-wip-us.apache.org/repos/asf/tez/blob/6ba1339d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java index a455f1e..9bb6d7f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java @@ -37,14 +37,15 @@ public class AMContainerEventCompleted extends AMContainerEvent { } public boolean isPreempted() { - return (exitStatus == ContainerExitStatus.PREEMPTED); + return (exitStatus == ContainerExitStatus.PREEMPTED || + errCause == TaskAttemptTerminationCause.INTERNAL_PREEMPTION); } public boolean isDiskFailed() { return (exitStatus == ContainerExitStatus.DISKS_FAILED); } - public boolean isClusterAction() { + public boolean isSystemAction() { return isPreempted() || isDiskFailed(); } http://git-wip-us.apache.org/repos/asf/tez/blob/6ba1339d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java index 536001c..5c5a8c5 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java @@ -662,7 +662,7 @@ public class AMContainerImpl implements AMContainer { AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent; if (container.pendingAttempt != null) { String errorMessage = getMessage(container, event); - if (event.isClusterAction()) { + if (event.isSystemAction()) { container.sendContainerTerminatedBySystemToTaskAttempt(container.pendingAttempt, errorMessage, event.getTerminationCause()); } else { @@ -921,7 +921,7 @@ public class AMContainerImpl implements AMContainer { @Override public void transition(AMContainerImpl container, AMContainerEvent cEvent) { AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent; - if (event.isClusterAction()) { + if (event.isSystemAction()) { container.sendContainerTerminatedBySystemToTaskAttempt(container.runningAttempt, getMessage(container, event), event.getTerminationCause()); } else { http://git-wip-us.apache.org/repos/asf/tez/blob/6ba1339d/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java index bed971a..a46821a 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java @@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; @@ -37,10 +36,16 @@ import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher; import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher.ContainerData; import org.apache.tez.dag.app.dag.DAGState; +import org.apache.tez.dag.app.dag.TaskAttempt; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError; +import org.apache.tez.dag.app.dag.impl.DAGImpl; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; import org.junit.Assert; import org.junit.Test; @@ -104,6 +109,37 @@ public class TestMockDAGAppMaster { tezClient.stop(); } + @Test (timeout = 5000) + public void testInternalPreemption() throws Exception { + TezConfiguration tezconf = new TezConfiguration(defaultConf); + + MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null); + tezClient.start(); + + MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp(); + MockContainerLauncher mockLauncher = mockApp.getContainerLauncher(); + mockLauncher.startScheduling(false); + // there is only 1 task whose first attempt will be preempted + DAG dag = DAG.create("test"); + Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 1); + dag.addVertex(vA); + + DAGClient dagClient = tezClient.submitDAG(dag); + mockLauncher.waitTillContainersLaunched(); + ContainerData cData = mockLauncher.getContainers().values().iterator().next(); + DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG(); + mockApp.getTaskSchedulerEventHandler().preemptContainer(cData.cId); + + mockLauncher.startScheduling(true); + dagClient.waitForCompletion(); + Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState()); + TezVertexID vertexId = TezVertexID.getInstance(dagImpl.getID(), 0); + TezTaskAttemptID killedTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0); + TaskAttempt killedTa = dagImpl.getVertex(vA.getName()).getTask(0).getAttempt(killedTaId); + Assert.assertEquals(TaskAttemptState.KILLED, killedTa.getState()); + tezClient.stop(); + } + @Test (timeout = 10000) public void testMultipleSubmissions() throws Exception { Map lrDAG = Maps.newHashMap(); http://git-wip-us.apache.org/repos/asf/tez/blob/6ba1339d/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java index d2dece3..62618cc 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java @@ -204,7 +204,7 @@ public class TestTaskSchedulerEventHandler { AMContainerEventCompleted completedEvent = (AMContainerEventCompleted) event; Assert.assertEquals(mockCId, completedEvent.getContainerId()); Assert.assertEquals("Container preempted internally", completedEvent.getDiagnostics()); - Assert.assertFalse(completedEvent.isPreempted()); + Assert.assertTrue(completedEvent.isPreempted()); Assert.assertFalse(completedEvent.isDiskFailed()); Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION, completedEvent.getTerminationCause()); http://git-wip-us.apache.org/repos/asf/tez/blob/6ba1339d/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java index f273896..438c50d 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java @@ -704,10 +704,10 @@ public class TestAMContainer { verify(wc.chh).unregister(wc.containerID); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); - Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION, - ((TaskAttemptEventContainerTerminated)outgoingEvents.get(0)).getTerminationCause()); verifyUnOrderedOutgoingEventTypes(outgoingEvents, - TaskAttemptEventType.TA_CONTAINER_TERMINATED); + TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM); + Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION, + ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause()); assertFalse(wc.amContainer.isInErrorState());