Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D2FDB10265 for ; Thu, 7 May 2015 05:03:21 +0000 (UTC) Received: (qmail 1889 invoked by uid 500); 7 May 2015 05:03:21 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 1852 invoked by uid 500); 7 May 2015 05:03:21 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 1843 invoked by uid 99); 7 May 2015 05:03:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 May 2015 05:03:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7BD7EE17A2; Thu, 7 May 2015 05:03:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zjffdu@apache.org To: commits@tez.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-2404. Handle DataMovementEvent before its TaskAttemptCompletedEvent (zjffdu) Date: Thu, 7 May 2015 05:03:21 +0000 (UTC) Repository: tez Updated Branches: refs/heads/master d5a0f39d9 -> 02870f0ac TEZ-2404. Handle DataMovementEvent before its TaskAttemptCompletedEvent (zjffdu) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/02870f0a Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/02870f0a Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/02870f0a Branch: refs/heads/master Commit: 02870f0ac1095d67a85a864860b0c4ce68a1db57 Parents: d5a0f39 Author: Jeff Zhang Authored: Thu May 7 13:03:07 2015 +0800 Committer: Jeff Zhang Committed: Thu May 7 13:03:07 2015 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../dag/app/TaskAttemptListenerImpTezDag.java | 35 +++++--------------- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 8 +++++ .../app/TestTaskAttemptListenerImplTezDag.java | 21 +++++++----- 4 files changed, 31 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/02870f0a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 58648e4..7feefcc 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -20,6 +20,7 @@ INCOMPATIBLE CHANGES Default max limit increased. Should not affect existing users. ALL CHANGES: + TEZ-2404. Handle DataMovementEvent before its TaskAttemptCompletedEvent TEZ-2424. Bump up max counter group name length limit to account for per_io counters. TEZ-2417. Tez UI: Counters are blank in the Attempts page if all attempts failed TEZ-2366. Pig tez MiniTezCluster unit tests fail intermittently after TEZ-2333 http://git-wip-us.apache.org/repos/asf/tez/blob/02870f0a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java index d96da83..b38081b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java @@ -423,12 +423,17 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements } List otherEvents = new ArrayList(); + // route TASK_STATUS_UPDATE_EVENT directly to TaskAttempt and route other events + // (DATA_MOVEMENT_EVENT, TASK_ATTEMPT_COMPLETED_EVENT, TASK_ATTEMPT_FAILED_EVENT) + // to VertexImpl to ensure the events ordering + // 1. DataMovementEvent is logged as RecoveryEvent before TaskAttemptFinishedEvent + // 2. TaskStatusEvent is handled before TaskAttemptFinishedEvent for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) { final EventType eventType = tezEvent.getEventType(); - if (eventType == EventType.TASK_STATUS_UPDATE_EVENT || - eventType == EventType.TASK_ATTEMPT_COMPLETED_EVENT) { - context.getEventHandler() - .handle(getTaskAttemptEventFromTezEvent(taskAttemptID, tezEvent)); + if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) { + TaskAttemptEvent taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID, + (TaskStatusUpdateEvent) tezEvent.getEvent()); + context.getEventHandler().handle(taskAttemptEvent); } else { otherEvents.add(tezEvent); } @@ -453,28 +458,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements } } - private TaskAttemptEvent getTaskAttemptEventFromTezEvent(TezTaskAttemptID taskAttemptID, - TezEvent tezEvent) { - final EventType eventType = tezEvent.getEventType(); - TaskAttemptEvent taskAttemptEvent; - switch (eventType) { - case TASK_STATUS_UPDATE_EVENT: - { - taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID, - (TaskStatusUpdateEvent) tezEvent.getEvent()); - } - break; - case TASK_ATTEMPT_COMPLETED_EVENT: - { - taskAttemptEvent = new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE); - } - break; - default: - throw new TezUncheckedException("unknown event type " + eventType); - } - return taskAttemptEvent; - } - private Map convertLocalResourceMap(Map ylrs) throws IOException { Map tlrs = Maps.newHashMap(); http://git-wip-us.apache.org/repos/asf/tez/blob/02870f0a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 9ed7441..5d61642 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -132,6 +132,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted; import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule; import org.apache.tez.dag.app.dag.event.VertexEventTermination; import org.apache.tez.dag.app.dag.event.VertexEventType; +import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo; import org.apache.tez.dag.app.dag.speculation.legacy.LegacySpeculator; import org.apache.tez.dag.history.DAGHistoryEvent; @@ -4131,6 +4132,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl ); } break; + case TASK_ATTEMPT_COMPLETED_EVENT: + { + checkEventSourceMetadata(vertex, sourceMeta); + vertex.getEventHandler().handle( + new TaskAttemptEvent(sourceMeta.getTaskAttemptID(), TaskAttemptEventType.TA_DONE)); + } + break; default: throw new TezUncheckedException("Unhandled tez event type: " + tezEvent.getEventType()); http://git-wip-us.apache.org/repos/asf/tez/blob/02870f0a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java index ec4f99a..f974f40 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -53,6 +54,7 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.InputInitializerEvent; import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent; import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; @@ -179,8 +181,9 @@ public class TestTaskAttemptListenerImplTezDag { @Test (timeout = 5000) public void testTaskEventRouting() throws Exception { List events = Arrays.asList( - new TezEvent(InputInitializerEvent.create("test_vertex", "test_input", null), null), - new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null), null) + new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null), null), + new TezEvent(DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0])), null), + new TezEvent(new TaskAttemptCompletedEvent(), null) ); EventHandler eventHandler = generateHeartbeat(events); @@ -193,13 +196,15 @@ public class TestTaskAttemptListenerImplTezDag { assertEquals("First event should be status update", TaskAttemptEventType.TA_STATUS_UPDATE, statusUpdateEvent.getType()); - final Event vertexEvent = argAllValues.get(1); final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)vertexEvent; - assertEquals("Other events should be routed to vertex", VertexEventType.V_ROUTE_EVENT, + assertEquals("First event should be routed to vertex", VertexEventType.V_ROUTE_EVENT, vertexEvent.getType()); - assertEquals(EventType.ROOT_INPUT_INITIALIZER_EVENT, + assertEquals(EventType.DATA_MOVEMENT_EVENT, vertexRouteEvent.getEvents().get(0).getEventType()); + assertEquals(EventType.TASK_ATTEMPT_COMPLETED_EVENT, + vertexRouteEvent.getEvents().get(1).getEventType()); + } @Test (timeout = 5000) @@ -213,9 +218,9 @@ public class TestTaskAttemptListenerImplTezDag { verify(eventHandler, times(1)).handle(arg.capture()); final List argAllValues = arg.getAllValues(); - final Event statusUpdateEvent = argAllValues.get(0); - assertEquals("only event should be task done", TaskAttemptEventType.TA_DONE, - statusUpdateEvent.getType()); + final Event event = argAllValues.get(0); + assertEquals("only event should be route event", VertexEventType.V_ROUTE_EVENT, + event.getType()); } private EventHandler generateHeartbeat(List events) throws IOException, TezException {