Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0AAD61873E for ; Thu, 30 Apr 2015 04:24:25 +0000 (UTC) Received: (qmail 56326 invoked by uid 500); 30 Apr 2015 04:24:25 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 56294 invoked by uid 500); 30 Apr 2015 04:24:24 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 56285 invoked by uid 99); 30 Apr 2015 04:24:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Apr 2015 04:24:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9644EE3598; Thu, 30 Apr 2015 04:24:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zjffdu@apache.org To: commits@tez.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-1521. VertexDataMovementEventsGeneratedEvent may be logged twice in recovery log (zjffdu) Date: Thu, 30 Apr 2015 04:24:24 +0000 (UTC) Repository: tez Updated Branches: refs/heads/master 5f63de8ee -> 3894c5ec6 TEZ-1521. VertexDataMovementEventsGeneratedEvent may be logged twice in recovery log (zjffdu) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3894c5ec Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3894c5ec Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3894c5ec Branch: refs/heads/master Commit: 3894c5ec6b707d7fff6381091fbbdf05c89f0f81 Parents: 5f63de8 Author: Jeff Zhang Authored: Thu Apr 30 11:15:53 2015 +0800 Committer: Jeff Zhang Committed: Thu Apr 30 11:15:53 2015 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/dag/app/dag/impl/VertexImpl.java | 11 +++-- .../tez/dag/app/dag/impl/TestVertexImpl.java | 50 ++++++++++++++++++++ 3 files changed, 57 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/3894c5ec/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3a1867e..cfdc679 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -325,6 +325,7 @@ TEZ-UI CHANGES (TEZ-8): Release 0.5.4: Unreleased ALL CHANGES: + TEZ-1521. VertexDataMovementEventsGeneratedEvent may be logged twice in recovery log TEZ-2348. EOF exception during UnorderedKVReader.next(). TEZ-1560. Invalid state machine handling for V_SOURCE_VERTEX_RECOVERED in recovery. TEZ-2305. MR compatibility sleep job fails with IOException: Undefined job output-path http://git-wip-us.apache.org/repos/asf/tez/blob/3894c5ec/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index b63466a..c5de19b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -1397,7 +1397,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, if (!pendingTaskEvents.isEmpty()) { LOG.info("Routing pending task events for vertex: " + logIdentifier); try { - handleRoutedTezEvents(this, pendingTaskEvents, false); + handleRoutedTezEvents(this, pendingTaskEvents, false, true); } catch (AMUserCodeException e) { String msg = "Exception in " + e.getSource() +", vertex=" + logIdentifier; LOG.error(msg, e); @@ -3025,7 +3025,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, vertex.recoveredEvents.clear(); if (!vertex.pendingRouteEvents.isEmpty()) { try { - handleRoutedTezEvents(vertex, vertex.pendingRouteEvents, false); + handleRoutedTezEvents(vertex, vertex.pendingRouteEvents, false, true); vertex.pendingRouteEvents.clear(); } catch (AMUserCodeException e) { String msg = "Exception in " + e.getSource() + ", vertex=" + vertex.getLogIdentifier(); @@ -3284,7 +3284,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, List inputInfoEvents = iEvent.getEvents(); try { if (inputInfoEvents != null && !inputInfoEvents.isEmpty()) { - VertexImpl.handleRoutedTezEvents(vertex, inputInfoEvents, false); + VertexImpl.handleRoutedTezEvents(vertex, inputInfoEvents, false, false); } } catch (AMUserCodeException e) { String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier(); @@ -3941,7 +3941,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, boolean recovered = rEvent.isRecovered(); List tezEvents = rEvent.getEvents(); try { - VertexImpl.handleRoutedTezEvents(vertex, tezEvents, recovered); + VertexImpl.handleRoutedTezEvents(vertex, tezEvents, recovered, false); } catch (AMUserCodeException e) { String msg = "Exception in " + e.getSource() + ", vertex=" + vertex.getLogIdentifier(); LOG.error(msg, e); @@ -3960,9 +3960,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } } - private static void handleRoutedTezEvents(VertexImpl vertex, List tezEvents, boolean recovered) throws AMUserCodeException { + private static void handleRoutedTezEvents(VertexImpl vertex, List tezEvents, boolean recovered, boolean isPendingEvents) throws AMUserCodeException { if (vertex.getAppContext().isRecoveryEnabled() && !recovered + && !isPendingEvents && !tezEvents.isEmpty()) { List recoveryEvents = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/tez/blob/3894c5ec/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 3147093..99ec6cf 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -145,7 +145,10 @@ import org.apache.tez.dag.app.dag.impl.TestVertexImpl.VertexManagerWithException import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler; import org.apache.tez.dag.app.rm.container.AMContainerMap; import org.apache.tez.dag.app.rm.container.ContainerContextMatcher; +import org.apache.tez.dag.history.DAGHistoryEvent; import org.apache.tez.dag.history.HistoryEventHandler; +import org.apache.tez.dag.history.HistoryEventType; +import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent; import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager; import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; import org.apache.tez.dag.records.TaskAttemptTerminationCause; @@ -182,6 +185,9 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatcher; +import org.mockito.Matchers; import org.mockito.Mockito; import org.mockito.internal.util.collections.Sets; @@ -5528,6 +5534,50 @@ public class TestVertexImpl { Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, v2.getTerminationCause()); } + @Test (timeout = 5000) + public void testRouteEvent_RecoveredEvent() throws IOException { + doReturn(historyEventHandler).when(appContext).getHistoryHandler(); + doReturn(true).when(appContext).isRecoveryEnabled(); + + initAllVertices(VertexState.INITED); + VertexImpl v1 = (VertexImpl)vertices.get("vertex1"); + VertexImpl v2 = (VertexImpl)vertices.get("vertex2"); + VertexImpl v3 = (VertexImpl)vertices.get("vertex3"); + startVertex(v1); + startVertex(v2); + TezTaskID taskId = TezTaskID.getInstance(v1.getVertexId(), 0); + v1.handle(new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED)); + DataMovementEvent dmEvent = DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0])); + TezTaskAttemptID taId = TezTaskAttemptID.getInstance(taskId, 0); + TezEvent tezEvent1 = new TezEvent(dmEvent, new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", "vertex3", taId)); + v1.handle(new VertexEventRouteEvent(v1.getVertexId(), Lists.newArrayList(tezEvent1))); + dispatcher.await(); + assertTrue(v3.pendingTaskEvents.size() != 0); + ArgumentCaptor argCaptor = ArgumentCaptor.forClass(DAGHistoryEvent.class); + verify(historyEventHandler, atLeast(1)).handle(argCaptor.capture()); + verifyHistoryEvents(argCaptor.getAllValues(), HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED, 1); + + v3.scheduleTasks(Lists.newArrayList(new TaskWithLocationHint(0, null))); + dispatcher.await(); + assertTrue(v3.pendingTaskEvents.size() == 0); + // recovery events is not only handled one time + argCaptor = ArgumentCaptor.forClass(DAGHistoryEvent.class); + verify(historyEventHandler, atLeast(1)).handle(argCaptor.capture()); + verifyHistoryEvents(argCaptor.getAllValues(), HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED, 1); + } + + private void verifyHistoryEvents(List events, HistoryEventType eventType, int expectedTimes) { + int actualTimes = 0; + LOG.info(""); + for (DAGHistoryEvent event : events) { + LOG.info(event.getHistoryEvent().getEventType() + ""); + if (event.getHistoryEvent().getEventType() == eventType) { + actualTimes ++; + } + } + Assert.assertEquals(actualTimes, expectedTimes); + } + @InterfaceAudience.Private public static class RootInputSpecUpdaterVertexManager extends VertexManagerPlugin {