Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9571217B1C for ; Tue, 5 May 2015 18:56:18 +0000 (UTC) Received: (qmail 68273 invoked by uid 500); 5 May 2015 18:56:18 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 68237 invoked by uid 500); 5 May 2015 18:56:18 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 68228 invoked by uid 99); 5 May 2015 18:56:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 May 2015 18:56:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4C2BCE13CA; Tue, 5 May 2015 18:56:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Message-Id: <93ca469166ef431492e15ecb934df157@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-2369. Add a few unit tests for RootInputInitializerManager. (sseth) Date: Tue, 5 May 2015 18:56:18 +0000 (UTC) Repository: tez Updated Branches: refs/heads/master 0a6a7d3b6 -> 8c44f2484 TEZ-2369. Add a few unit tests for RootInputInitializerManager. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8c44f248 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8c44f248 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8c44f248 Branch: refs/heads/master Commit: 8c44f2484b626ce9a7a3408bd3b0d7e0df2b1a24 Parents: 0a6a7d3 Author: Siddharth Seth Authored: Tue May 5 11:55:47 2015 -0700 Committer: Siddharth Seth Committed: Tue May 5 11:55:47 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../app/dag/RootInputInitializerManager.java | 1 + .../dag/TestRootInputInitializerManager.java | 201 +++++++++++++++++++ 3 files changed, 203 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/8c44f248/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ba03aa3..d7a1e1f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -339,6 +339,7 @@ TEZ-UI CHANGES (TEZ-8): Release 0.5.4: Unreleased ALL CHANGES: + TEZ-2369. Add a few unit tests for RootInputInitializerManager. TEZ-2379. org.apache.hadoop.yarn.state.InvalidStateTransitonException: Invalid event: T_ATTEMPT_KILLED at KILLED. TEZ-2397. Translation of LocalResources via Tez plan serialization can be lossy. http://git-wip-us.apache.org/repos/asf/tez/blob/8c44f248/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java index 7156e60..4ee00fa 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java @@ -425,6 +425,7 @@ public class RootInputInitializerManager { if (taskAttemptIndex == successfulAttempt) { toForwardEvents.add((InputInitializerEvent) tezEvent.getEvent()); } + // Drop all other events which have the same source task Id. eventIterator.remove(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/8c44f248/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java new file mode 100644 index 0000000..89eb2a6 --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java @@ -0,0 +1,201 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.dag; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; + +import com.google.common.collect.Lists; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.InputInitializerDescriptor; +import org.apache.tez.dag.api.RootInputLeafOutput; +import org.apache.tez.dag.api.oldrecords.TaskState; +import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.InputInitializer; +import org.apache.tez.runtime.api.InputInitializerContext; +import org.apache.tez.runtime.api.events.InputInitializerEvent; +import org.apache.tez.runtime.api.impl.EventMetaData; +import org.apache.tez.runtime.api.impl.TezEvent; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +public class TestRootInputInitializerManager { + + // Simple testing. No events if task doesn't succeed. + // Also exercises path where two attempts are reported as successful via the stateChangeNotifier. + // Primarily a failure scenario, when a Task moves back to running from success + // Order event1, success1, event2, success2 + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testEventBeforeSuccess() throws Exception { + InputDescriptor id = mock(InputDescriptor.class); + InputInitializerDescriptor iid = mock(InputInitializerDescriptor.class); + RootInputLeafOutput rootInput = + new RootInputLeafOutput("InputName", id, iid); + + InputInitializer initializer = mock(InputInitializer.class); + InputInitializerContext initializerContext = mock(InputInitializerContext.class); + Vertex vertex = mock(Vertex.class); + StateChangeNotifier stateChangeNotifier = mock(StateChangeNotifier.class); + AppContext appContext = mock(AppContext.class, RETURNS_DEEP_STUBS); + + RootInputInitializerManager.InitializerWrapper initializerWrapper = + new RootInputInitializerManager.InitializerWrapper(rootInput, initializer, + initializerContext, vertex, stateChangeNotifier, appContext); + + ApplicationId appId = ApplicationId.newInstance(1000, 1); + TezDAGID dagId = TezDAGID.getInstance(appId, 1); + TezVertexID srcVertexId = TezVertexID.getInstance(dagId, 2); + TezTaskID srcTaskId1 = TezTaskID.getInstance(srcVertexId, 3); + Vertex srcVertex = mock(Vertex.class); + Task srcTask1 = mock(Task.class); + doReturn(TaskState.RUNNING).when(srcTask1).getState(); + doReturn(srcTask1).when(srcVertex).getTask(srcTaskId1.getId()); + when(appContext.getCurrentDAG().getVertex(any(String.class))).thenReturn(srcVertex); + + String srcVertexName = "srcVertexName"; + List eventList = Lists.newLinkedList(); + + + // First Attempt send event + TezTaskAttemptID srcTaskAttemptId11 = TezTaskAttemptID.getInstance(srcTaskId1, 1); + EventMetaData sourceInfo11 = + new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, srcVertexName, null, + srcTaskAttemptId11); + InputInitializerEvent e1 = InputInitializerEvent.create("fakeVertex", "fakeInput", null); + TezEvent te1 = new TezEvent(e1, sourceInfo11); + eventList.add(te1); + initializerWrapper.handleInputInitializerEvents(eventList); + + verify(initializer, never()).handleInputInitializerEvent(any(List.class)); + eventList.clear(); + + // First attempt, Task success notification + initializerWrapper.onTaskSucceeded(srcVertexName, srcTaskId1, srcTaskAttemptId11.getId()); + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(List.class); + verify(initializer, times(1)).handleInputInitializerEvent(argumentCaptor.capture()); + List invokedEvents = argumentCaptor.getValue(); + assertEquals(1, invokedEvents.size()); + + reset(initializer); + + // 2nd attempt send event + TezTaskAttemptID srcTaskAttemptId12 = TezTaskAttemptID.getInstance(srcTaskId1, 2); + EventMetaData sourceInfo12 = + new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, srcVertexName, null, + srcTaskAttemptId12); + InputInitializerEvent e2 = InputInitializerEvent.create("fakeVertex", "fakeInput", null); + TezEvent te2 = new TezEvent(e2, sourceInfo12); + eventList.add(te2); + initializerWrapper.handleInputInitializerEvents(eventList); + + verify(initializer, never()).handleInputInitializerEvent(any(List.class)); + eventList.clear(); + reset(initializer); + + // 2nd attempt succeeded + initializerWrapper.onTaskSucceeded(srcVertexName, srcTaskId1, srcTaskAttemptId12.getId()); + verify(initializer, never()).handleInputInitializerEvent(argumentCaptor.capture()); + } + + // Order event1 success1, success2, event2 + // Primarily a failure scenario, when a Task moves back to running from success + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testSuccessBeforeEvent() throws Exception { + InputDescriptor id = mock(InputDescriptor.class); + InputInitializerDescriptor iid = mock(InputInitializerDescriptor.class); + RootInputLeafOutput rootInput = + new RootInputLeafOutput("InputName", id, iid); + + InputInitializer initializer = mock(InputInitializer.class); + InputInitializerContext initializerContext = mock(InputInitializerContext.class); + Vertex vertex = mock(Vertex.class); + StateChangeNotifier stateChangeNotifier = mock(StateChangeNotifier.class); + AppContext appContext = mock(AppContext.class, RETURNS_DEEP_STUBS); + + RootInputInitializerManager.InitializerWrapper initializerWrapper = + new RootInputInitializerManager.InitializerWrapper(rootInput, initializer, + initializerContext, vertex, stateChangeNotifier, appContext); + + ApplicationId appId = ApplicationId.newInstance(1000, 1); + TezDAGID dagId = TezDAGID.getInstance(appId, 1); + TezVertexID srcVertexId = TezVertexID.getInstance(dagId, 2); + TezTaskID srcTaskId1 = TezTaskID.getInstance(srcVertexId, 3); + Vertex srcVertex = mock(Vertex.class); + Task srcTask1 = mock(Task.class); + doReturn(TaskState.RUNNING).when(srcTask1).getState(); + doReturn(srcTask1).when(srcVertex).getTask(srcTaskId1.getId()); + when(appContext.getCurrentDAG().getVertex(any(String.class))).thenReturn(srcVertex); + + String srcVertexName = "srcVertexName"; + List eventList = Lists.newLinkedList(); + + + // First Attempt send event + TezTaskAttemptID srcTaskAttemptId11 = TezTaskAttemptID.getInstance(srcTaskId1, 1); + EventMetaData sourceInfo11 = + new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, srcVertexName, null, + srcTaskAttemptId11); + InputInitializerEvent e1 = InputInitializerEvent.create("fakeVertex", "fakeInput", null); + TezEvent te1 = new TezEvent(e1, sourceInfo11); + eventList.add(te1); + initializerWrapper.handleInputInitializerEvents(eventList); + + verify(initializer, never()).handleInputInitializerEvent(any(List.class)); + eventList.clear(); + + // First attempt, Task success notification + initializerWrapper.onTaskSucceeded(srcVertexName, srcTaskId1, srcTaskAttemptId11.getId()); + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(List.class); + verify(initializer, times(1)).handleInputInitializerEvent(argumentCaptor.capture()); + List invokedEvents = argumentCaptor.getValue(); + assertEquals(1, invokedEvents.size()); + + reset(initializer); + + + TezTaskAttemptID srcTaskAttemptId12 = TezTaskAttemptID.getInstance(srcTaskId1, 2); + // 2nd attempt succeeded + initializerWrapper.onTaskSucceeded(srcVertexName, srcTaskId1, srcTaskAttemptId12.getId()); + verify(initializer, never()).handleInputInitializerEvent(any(List.class)); + + // 2nd attempt send event + EventMetaData sourceInfo12 = + new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, srcVertexName, null, + srcTaskAttemptId12); + InputInitializerEvent e2 = InputInitializerEvent.create("fakeVertex", "fakeInput", null); + TezEvent te2 = new TezEvent(e2, sourceInfo12); + eventList.add(te2); + initializerWrapper.handleInputInitializerEvents(eventList); + + verify(initializer, never()).handleInputInitializerEvent(any(List.class)); + } +}