Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8E0E318466 for ; Sat, 22 Aug 2015 07:26:00 +0000 (UTC) Received: (qmail 98447 invoked by uid 500); 22 Aug 2015 07:26:00 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 98412 invoked by uid 500); 22 Aug 2015 07:26:00 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 98403 invoked by uid 99); 22 Aug 2015 07:26:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 22 Aug 2015 07:26:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4F474DFD9E; Sat, 22 Aug 2015 07:26:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Date: Sat, 22 Aug 2015 07:26:00 -0000 Message-Id: <2a5d73e46f33459bb1e1ac71d0f152f9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [01/50] [abbrv] tez git commit: TEZ-2433. Fixes after rebase 05/08. (sseth) Repository: tez Updated Branches: refs/heads/master 9d118e54e -> 2e62e98ec TEZ-2433. Fixes after rebase 05/08. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4bb34211 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4bb34211 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4bb34211 Branch: refs/heads/master Commit: 4bb342115a5af2c28d5ad99245e861e7f62c4298 Parents: df91ad5 Author: Siddharth Seth Authored: Fri May 8 18:43:16 2015 -0700 Committer: Siddharth Seth Committed: Fri Aug 21 18:13:54 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../tez/dag/api/TaskHeartbeatResponse.java | 10 ++++++-- .../dag/app/TaskAttemptListenerImpTezDag.java | 27 ++++++++++---------- .../tez/dag/app/TezTaskCommunicatorImpl.java | 9 +++---- .../app/TestTaskAttemptListenerImplTezDag.java | 10 +++----- .../library/common/shuffle/TestFetcher.java | 8 ++---- 6 files changed, 31 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/4bb34211/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index 9b2339f..ad167ab 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -22,5 +22,6 @@ ALL CHANGES: TEZ-2388. Send dag identifier as part of the fetcher request string. TEZ-2414. LogicalIOProcessorRuntimeTask, RuntimeTask, TezTaskRunner should handle interrupts & carry out necessary cleanups. TEZ-2420. TaskRunner returning before executing the task. + TEZ-2433. Fixes after rebase 05/08 INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/4bb34211/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java index c82a743..b826e76 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java @@ -22,11 +22,13 @@ import org.apache.tez.runtime.api.impl.TezEvent; public class TaskHeartbeatResponse { private final boolean shouldDie; - private List events; + private final int nextFromEventId; + private final List events; - public TaskHeartbeatResponse(boolean shouldDie, List events) { + public TaskHeartbeatResponse(boolean shouldDie, List events, int nextFromEventId) { this.shouldDie = shouldDie; this.events = events; + this.nextFromEventId = nextFromEventId; } public boolean isShouldDie() { @@ -36,4 +38,8 @@ public class TaskHeartbeatResponse { public List getEvents() { return events; } + + public int getNextFromEventId() { + return nextFromEventId; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/4bb34211/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java index cbaed99..db78fa9 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java @@ -78,7 +78,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements protected final TaskHeartbeatHandler taskHeartbeatHandler; protected final ContainerHeartbeatHandler containerHeartbeatHandler; - private final TaskHeartbeatResponse RESPONSE_SHOULD_DIE = new TaskHeartbeatResponse(true, null); + private final TaskHeartbeatResponse RESPONSE_SHOULD_DIE = new TaskHeartbeatResponse(true, null, 0); private final ConcurrentMap registeredAttempts = new ConcurrentHashMap(); @@ -194,7 +194,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements // So - avoiding synchronization. pingContainerHeartbeatHandler(containerId); - List outEvents = null; + TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(0, null); TezTaskAttemptID taskAttemptID = request.getTaskAttemptId(); if (taskAttemptID != null) { ContainerId containerIdFromMap = registeredAttempts.get(taskAttemptID); @@ -216,12 +216,17 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements } List otherEvents = new ArrayList(); + // route TASK_STATUS_UPDATE_EVENT directly to TaskAttempt and route other events + // (DATA_MOVEMENT_EVENT, TASK_ATTEMPT_COMPLETED_EVENT, TASK_ATTEMPT_FAILED_EVENT) + // to VertexImpl to ensure the events ordering + // 1. DataMovementEvent is logged as RecoveryEvent before TaskAttemptFinishedEvent + // 2. TaskStatusEvent is handled before TaskAttemptFinishedEvent for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) { final EventType eventType = tezEvent.getEventType(); - if (eventType == EventType.TASK_STATUS_UPDATE_EVENT || - eventType == EventType.TASK_ATTEMPT_COMPLETED_EVENT) { - context.getEventHandler() - .handle(getTaskAttemptEventFromTezEvent(taskAttemptID, tezEvent)); + if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) { + TaskAttemptEvent taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID, + (TaskStatusUpdateEvent) tezEvent.getEvent()); + context.getEventHandler().handle(taskAttemptEvent); } else { otherEvents.add(tezEvent); } @@ -232,14 +237,13 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(otherEvents))); } taskHeartbeatHandler.pinged(taskAttemptID); - outEvents = context + eventInfo = context .getCurrentDAG() .getVertex(taskAttemptID.getTaskID().getVertexID()) - .getTask(taskAttemptID.getTaskID()) .getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(), request.getMaxEvents()); } - return new TaskHeartbeatResponse(false, outEvents); + return new TaskHeartbeatResponse(false, eventInfo.getEvents(), eventInfo.getNextFromEventId()); } public void taskAlive(TezTaskAttemptID taskAttemptId) { taskHeartbeatHandler.pinged(taskAttemptId); @@ -435,9 +439,4 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements + ", ContainerId not known for this attempt"); } } - - - public TaskCommunicator getTaskCommunicator() { - return taskCommunicators[0]; - } } http://git-wip-us.apache.org/repos/asf/tez/blob/4bb34211/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java index 6200a5b..accde2c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java @@ -364,13 +364,10 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { request.getMaxEvents()); tResponse = taskCommunicatorContext.heartbeat(tRequest); } - TezHeartbeatResponse response; - if (tResponse == null) { - response = new TezHeartbeatResponse(); - } else { - response = new TezHeartbeatResponse(tResponse.getEvents()); - } + TezHeartbeatResponse response = new TezHeartbeatResponse(); response.setLastRequestId(requestId); + response.setEvents(tResponse.getEvents()); + response.setNextFromEventId(tResponse.getNextFromEventId()); containerInfo.lastRequestId = requestId; containerInfo.lastResponse = response; return response; http://git-wip-us.apache.org/repos/asf/tez/blob/4bb34211/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java index 2208220..34b9792 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java @@ -48,6 +48,7 @@ import org.apache.tez.common.ContainerTask; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TaskHeartbeatRequest; +import org.apache.tez.dag.api.TaskHeartbeatResponse; import org.apache.tez.dag.api.TezException; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.common.TezTaskUmbilicalProtocol; @@ -70,8 +71,6 @@ import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; import org.apache.tez.runtime.api.impl.EventType; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezEvent; -import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; -import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -260,10 +259,9 @@ public class TestTaskAttemptListenerImplTezDag { public void testTaskHeartbeatResponse() throws Exception { List events = new ArrayList(); List eventsToSend = new ArrayList(); - TezHeartbeatResponse response = generateHeartbeat(events, 0, 1, 2, eventsToSend); + TaskHeartbeatResponse response = generateHeartbeat(events, 0, 1, 2, eventsToSend); assertEquals(2, response.getNextFromEventId()); - assertEquals(1, response.getLastRequestId()); assertEquals(eventsToSend, response.getEvents()); } @@ -320,7 +318,7 @@ public class TestTaskAttemptListenerImplTezDag { return succeedToAllocate; } - private TezHeartbeatResponse generateHeartbeat(List events, + private TaskHeartbeatResponse generateHeartbeat(List events, int fromEventId, int maxEvents, int nextFromEventId, List sendEvents) throws IOException, TezException { ContainerId containerId = createContainerId(appId, 1); @@ -335,7 +333,7 @@ public class TestTaskAttemptListenerImplTezDag { taskAttemptListener.registerTaskAttempt(amContainerTask, containerId, 0); TaskHeartbeatRequest request = mock(TaskHeartbeatRequest.class); - + doReturn(containerId.toString()).when(request).getContainerIdentifier(); doReturn(containerId.toString()).when(request).getContainerIdentifier(); doReturn(taskAttemptID).when(request).getTaskAttemptId(); doReturn(events).when(request).getEvents(); http://git-wip-us.apache.org/repos/asf/tez/blob/4bb34211/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java index 85e3540..08efb3e 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java @@ -31,7 +31,6 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Iterator; import java.util.LinkedList; @@ -43,11 +42,8 @@ import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.runtime.api.ExecutionContext; -import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.apache.tez.runtime.library.common.InputIdentifier; @@ -93,7 +89,7 @@ public class TestFetcher { // when enabled and hostname does not match use http fetch. builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null, - ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, + ApplicationId.newInstance(0, 1), -1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, PORT, false); builder.assignWork(HOST + "_OTHER", PORT, 0, Arrays.asList(srcAttempts)); fetcher = spy(builder.build()); @@ -109,7 +105,7 @@ public class TestFetcher { // when enabled and port does not match use http fetch. builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null, - ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, PORT, false); + ApplicationId.newInstance(0, 1), -1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, PORT, false); builder.assignWork(HOST, PORT + 1, 0, Arrays.asList(srcAttempts)); fetcher = spy(builder.build());