Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 926A417B63 for ; Tue, 5 May 2015 18:58:54 +0000 (UTC) Received: (qmail 72489 invoked by uid 500); 5 May 2015 18:58:54 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 72453 invoked by uid 500); 5 May 2015 18:58:54 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 72444 invoked by uid 99); 5 May 2015 18:58:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 May 2015 18:58:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 52115E13CA; Tue, 5 May 2015 18:58:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-2369. Add a few unit tests for RootInputInitializerManager. Backport a findbugs warning fix from master. (sseth) (cherry picked from commit 4b65376991ae7c64aab10e50848814d7cb848cd8) Date: Tue, 5 May 2015 18:58:54 +0000 (UTC) Repository: tez Updated Branches: refs/heads/branch-0.5 bac43ea84 -> f1ab8dc0c TEZ-2369. Add a few unit tests for RootInputInitializerManager. Backport a findbugs warning fix from master. (sseth) (cherry picked from commit 4b65376991ae7c64aab10e50848814d7cb848cd8) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f1ab8dc0 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f1ab8dc0 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f1ab8dc0 Branch: refs/heads/branch-0.5 Commit: f1ab8dc0c5fa8df4811f64584ee1dfab38ddc981 Parents: bac43ea Author: Siddharth Seth Authored: Tue May 5 11:57:42 2015 -0700 Committer: Siddharth Seth Committed: Tue May 5 11:58:37 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../app/dag/RootInputInitializerManager.java | 3 +- .../dag/TestRootInputInitializerManager.java | 201 +++++++++++++++++++ 3 files changed, 204 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/f1ab8dc0/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e21a6fd..b752289 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -4,6 +4,7 @@ Apache Tez Change Log Release 0.5.4: Unreleased ALL CHANGES: + TEZ-2369. Add a few unit tests for RootInputInitializerManager. Backport a findbugs warning fix from master. TEZ-2379. org.apache.hadoop.yarn.state.InvalidStateTransitonException: Invalid event: T_ATTEMPT_KILLED at KILLED. TEZ-2397. Translation of LocalResources via Tez plan serialization can be lossy. http://git-wip-us.apache.org/repos/asf/tez/blob/f1ab8dc0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java index bdd3689..84379e6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java @@ -405,7 +405,7 @@ public class RootInputInitializerManager { "AttemptId is -1. This is likely caused by TEZ-1577; recovery not supported when InputInitializerEvents are used"); } Map vertexSuccessfulAttemptMap = firstSuccessfulAttemptMap.get(vertexName); - Integer successfulAttempt = vertexSuccessfulAttemptMap.get(taskId); + Integer successfulAttempt = vertexSuccessfulAttemptMap.get(taskId.getId()); if (successfulAttempt == null) { successfulAttempt = attemptId; vertexSuccessfulAttemptMap.put(taskId.getId(), successfulAttempt); @@ -425,6 +425,7 @@ public class RootInputInitializerManager { if (taskAttemptIndex == successfulAttempt) { toForwardEvents.add((InputInitializerEvent) tezEvent.getEvent()); } + // Drop all other events which have the same source task Id. eventIterator.remove(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/f1ab8dc0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java new file mode 100644 index 0000000..89eb2a6 --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java @@ -0,0 +1,201 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.dag; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; + +import com.google.common.collect.Lists; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.InputInitializerDescriptor; +import org.apache.tez.dag.api.RootInputLeafOutput; +import org.apache.tez.dag.api.oldrecords.TaskState; +import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.InputInitializer; +import org.apache.tez.runtime.api.InputInitializerContext; +import org.apache.tez.runtime.api.events.InputInitializerEvent; +import org.apache.tez.runtime.api.impl.EventMetaData; +import org.apache.tez.runtime.api.impl.TezEvent; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +public class TestRootInputInitializerManager { + + // Simple testing. No events if task doesn't succeed. + // Also exercises path where two attempts are reported as successful via the stateChangeNotifier. + // Primarily a failure scenario, when a Task moves back to running from success + // Order event1, success1, event2, success2 + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testEventBeforeSuccess() throws Exception { + InputDescriptor id = mock(InputDescriptor.class); + InputInitializerDescriptor iid = mock(InputInitializerDescriptor.class); + RootInputLeafOutput rootInput = + new RootInputLeafOutput("InputName", id, iid); + + InputInitializer initializer = mock(InputInitializer.class); + InputInitializerContext initializerContext = mock(InputInitializerContext.class); + Vertex vertex = mock(Vertex.class); + StateChangeNotifier stateChangeNotifier = mock(StateChangeNotifier.class); + AppContext appContext = mock(AppContext.class, RETURNS_DEEP_STUBS); + + RootInputInitializerManager.InitializerWrapper initializerWrapper = + new RootInputInitializerManager.InitializerWrapper(rootInput, initializer, + initializerContext, vertex, stateChangeNotifier, appContext); + + ApplicationId appId = ApplicationId.newInstance(1000, 1); + TezDAGID dagId = TezDAGID.getInstance(appId, 1); + TezVertexID srcVertexId = TezVertexID.getInstance(dagId, 2); + TezTaskID srcTaskId1 = TezTaskID.getInstance(srcVertexId, 3); + Vertex srcVertex = mock(Vertex.class); + Task srcTask1 = mock(Task.class); + doReturn(TaskState.RUNNING).when(srcTask1).getState(); + doReturn(srcTask1).when(srcVertex).getTask(srcTaskId1.getId()); + when(appContext.getCurrentDAG().getVertex(any(String.class))).thenReturn(srcVertex); + + String srcVertexName = "srcVertexName"; + List eventList = Lists.newLinkedList(); + + + // First Attempt send event + TezTaskAttemptID srcTaskAttemptId11 = TezTaskAttemptID.getInstance(srcTaskId1, 1); + EventMetaData sourceInfo11 = + new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, srcVertexName, null, + srcTaskAttemptId11); + InputInitializerEvent e1 = InputInitializerEvent.create("fakeVertex", "fakeInput", null); + TezEvent te1 = new TezEvent(e1, sourceInfo11); + eventList.add(te1); + initializerWrapper.handleInputInitializerEvents(eventList); + + verify(initializer, never()).handleInputInitializerEvent(any(List.class)); + eventList.clear(); + + // First attempt, Task success notification + initializerWrapper.onTaskSucceeded(srcVertexName, srcTaskId1, srcTaskAttemptId11.getId()); + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(List.class); + verify(initializer, times(1)).handleInputInitializerEvent(argumentCaptor.capture()); + List invokedEvents = argumentCaptor.getValue(); + assertEquals(1, invokedEvents.size()); + + reset(initializer); + + // 2nd attempt send event + TezTaskAttemptID srcTaskAttemptId12 = TezTaskAttemptID.getInstance(srcTaskId1, 2); + EventMetaData sourceInfo12 = + new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, srcVertexName, null, + srcTaskAttemptId12); + InputInitializerEvent e2 = InputInitializerEvent.create("fakeVertex", "fakeInput", null); + TezEvent te2 = new TezEvent(e2, sourceInfo12); + eventList.add(te2); + initializerWrapper.handleInputInitializerEvents(eventList); + + verify(initializer, never()).handleInputInitializerEvent(any(List.class)); + eventList.clear(); + reset(initializer); + + // 2nd attempt succeeded + initializerWrapper.onTaskSucceeded(srcVertexName, srcTaskId1, srcTaskAttemptId12.getId()); + verify(initializer, never()).handleInputInitializerEvent(argumentCaptor.capture()); + } + + // Order event1 success1, success2, event2 + // Primarily a failure scenario, when a Task moves back to running from success + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testSuccessBeforeEvent() throws Exception { + InputDescriptor id = mock(InputDescriptor.class); + InputInitializerDescriptor iid = mock(InputInitializerDescriptor.class); + RootInputLeafOutput rootInput = + new RootInputLeafOutput("InputName", id, iid); + + InputInitializer initializer = mock(InputInitializer.class); + InputInitializerContext initializerContext = mock(InputInitializerContext.class); + Vertex vertex = mock(Vertex.class); + StateChangeNotifier stateChangeNotifier = mock(StateChangeNotifier.class); + AppContext appContext = mock(AppContext.class, RETURNS_DEEP_STUBS); + + RootInputInitializerManager.InitializerWrapper initializerWrapper = + new RootInputInitializerManager.InitializerWrapper(rootInput, initializer, + initializerContext, vertex, stateChangeNotifier, appContext); + + ApplicationId appId = ApplicationId.newInstance(1000, 1); + TezDAGID dagId = TezDAGID.getInstance(appId, 1); + TezVertexID srcVertexId = TezVertexID.getInstance(dagId, 2); + TezTaskID srcTaskId1 = TezTaskID.getInstance(srcVertexId, 3); + Vertex srcVertex = mock(Vertex.class); + Task srcTask1 = mock(Task.class); + doReturn(TaskState.RUNNING).when(srcTask1).getState(); + doReturn(srcTask1).when(srcVertex).getTask(srcTaskId1.getId()); + when(appContext.getCurrentDAG().getVertex(any(String.class))).thenReturn(srcVertex); + + String srcVertexName = "srcVertexName"; + List eventList = Lists.newLinkedList(); + + + // First Attempt send event + TezTaskAttemptID srcTaskAttemptId11 = TezTaskAttemptID.getInstance(srcTaskId1, 1); + EventMetaData sourceInfo11 = + new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, srcVertexName, null, + srcTaskAttemptId11); + InputInitializerEvent e1 = InputInitializerEvent.create("fakeVertex", "fakeInput", null); + TezEvent te1 = new TezEvent(e1, sourceInfo11); + eventList.add(te1); + initializerWrapper.handleInputInitializerEvents(eventList); + + verify(initializer, never()).handleInputInitializerEvent(any(List.class)); + eventList.clear(); + + // First attempt, Task success notification + initializerWrapper.onTaskSucceeded(srcVertexName, srcTaskId1, srcTaskAttemptId11.getId()); + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(List.class); + verify(initializer, times(1)).handleInputInitializerEvent(argumentCaptor.capture()); + List invokedEvents = argumentCaptor.getValue(); + assertEquals(1, invokedEvents.size()); + + reset(initializer); + + + TezTaskAttemptID srcTaskAttemptId12 = TezTaskAttemptID.getInstance(srcTaskId1, 2); + // 2nd attempt succeeded + initializerWrapper.onTaskSucceeded(srcVertexName, srcTaskId1, srcTaskAttemptId12.getId()); + verify(initializer, never()).handleInputInitializerEvent(any(List.class)); + + // 2nd attempt send event + EventMetaData sourceInfo12 = + new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, srcVertexName, null, + srcTaskAttemptId12); + InputInitializerEvent e2 = InputInitializerEvent.create("fakeVertex", "fakeInput", null); + TezEvent te2 = new TezEvent(e2, sourceInfo12); + eventList.add(te2); + initializerWrapper.handleInputInitializerEvents(eventList); + + verify(initializer, never()).handleInputInitializerEvent(any(List.class)); + } +}