tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject tez git commit: TEZ-2369. Add a few unit tests for RootInputInitializerManager. Backport a findbugs warning fix from master. (sseth) (cherry picked from commit 4b65376991ae7c64aab10e50848814d7cb848cd8)
Date Tue, 05 May 2015 18:58:54 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.5 bac43ea84 -> f1ab8dc0c


TEZ-2369. Add a few unit tests for RootInputInitializerManager. Backport
a findbugs warning fix from master. (sseth)
(cherry picked from commit 4b65376991ae7c64aab10e50848814d7cb848cd8)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f1ab8dc0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f1ab8dc0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f1ab8dc0

Branch: refs/heads/branch-0.5
Commit: f1ab8dc0c5fa8df4811f64584ee1dfab38ddc981
Parents: bac43ea
Author: Siddharth Seth <sseth@apache.org>
Authored: Tue May 5 11:57:42 2015 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Tue May 5 11:58:37 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../app/dag/RootInputInitializerManager.java    |   3 +-
 .../dag/TestRootInputInitializerManager.java    | 201 +++++++++++++++++++
 3 files changed, 204 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/f1ab8dc0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e21a6fd..b752289 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-2369. Add a few unit tests for RootInputInitializerManager. Backport a findbugs warning
fix from master.
   TEZ-2379. org.apache.hadoop.yarn.state.InvalidStateTransitonException:
     Invalid event: T_ATTEMPT_KILLED at KILLED.
   TEZ-2397. Translation of LocalResources via Tez plan serialization can be lossy.

http://git-wip-us.apache.org/repos/asf/tez/blob/f1ab8dc0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index bdd3689..84379e6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -405,7 +405,7 @@ public class RootInputInitializerManager {
             "AttemptId is -1. This is likely caused by TEZ-1577; recovery not supported when
InputInitializerEvents are used");
       }
       Map<Integer, Integer> vertexSuccessfulAttemptMap = firstSuccessfulAttemptMap.get(vertexName);
-      Integer successfulAttempt = vertexSuccessfulAttemptMap.get(taskId);
+      Integer successfulAttempt = vertexSuccessfulAttemptMap.get(taskId.getId());
       if (successfulAttempt == null) {
         successfulAttempt = attemptId;
         vertexSuccessfulAttemptMap.put(taskId.getId(), successfulAttempt);
@@ -425,6 +425,7 @@ public class RootInputInitializerManager {
             if (taskAttemptIndex == successfulAttempt) {
               toForwardEvents.add((InputInitializerEvent) tezEvent.getEvent());
             }
+            // Drop all other events which have the same source task Id.
             eventIterator.remove();
           }
         }

http://git-wip-us.apache.org/repos/asf/tez/blob/f1ab8dc0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
new file mode 100644
index 0000000..89eb2a6
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
+import org.apache.tez.dag.api.RootInputLeafOutput;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.InputInitializer;
+import org.apache.tez.runtime.api.InputInitializerContext;
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+public class TestRootInputInitializerManager {
+
+  // Simple testing. No events if task doesn't succeed.
+  // Also exercises path where two attempts are reported as successful via the stateChangeNotifier.
+  // Primarily a failure scenario, when a Task moves back to running from success
+  // Order event1, success1, event2, success2
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testEventBeforeSuccess() throws Exception {
+    InputDescriptor id = mock(InputDescriptor.class);
+    InputInitializerDescriptor iid = mock(InputInitializerDescriptor.class);
+    RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> rootInput =
+        new RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>("InputName",
id, iid);
+
+    InputInitializer initializer = mock(InputInitializer.class);
+    InputInitializerContext initializerContext = mock(InputInitializerContext.class);
+    Vertex vertex = mock(Vertex.class);
+    StateChangeNotifier stateChangeNotifier = mock(StateChangeNotifier.class);
+    AppContext appContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+
+    RootInputInitializerManager.InitializerWrapper initializerWrapper =
+        new RootInputInitializerManager.InitializerWrapper(rootInput, initializer,
+            initializerContext, vertex, stateChangeNotifier, appContext);
+
+    ApplicationId appId = ApplicationId.newInstance(1000, 1);
+    TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+    TezVertexID srcVertexId = TezVertexID.getInstance(dagId, 2);
+    TezTaskID srcTaskId1 = TezTaskID.getInstance(srcVertexId, 3);
+    Vertex srcVertex = mock(Vertex.class);
+    Task srcTask1 = mock(Task.class);
+    doReturn(TaskState.RUNNING).when(srcTask1).getState();
+    doReturn(srcTask1).when(srcVertex).getTask(srcTaskId1.getId());
+    when(appContext.getCurrentDAG().getVertex(any(String.class))).thenReturn(srcVertex);
+
+    String srcVertexName = "srcVertexName";
+    List<TezEvent> eventList = Lists.newLinkedList();
+
+
+    // First Attempt send event
+    TezTaskAttemptID srcTaskAttemptId11 = TezTaskAttemptID.getInstance(srcTaskId1, 1);
+    EventMetaData sourceInfo11 =
+        new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, srcVertexName,
null,
+            srcTaskAttemptId11);
+    InputInitializerEvent e1 = InputInitializerEvent.create("fakeVertex", "fakeInput", null);
+    TezEvent te1 = new TezEvent(e1, sourceInfo11);
+    eventList.add(te1);
+    initializerWrapper.handleInputInitializerEvents(eventList);
+
+    verify(initializer, never()).handleInputInitializerEvent(any(List.class));
+    eventList.clear();
+
+    // First attempt, Task success notification
+    initializerWrapper.onTaskSucceeded(srcVertexName, srcTaskId1, srcTaskAttemptId11.getId());
+    ArgumentCaptor<List> argumentCaptor = ArgumentCaptor.forClass(List.class);
+    verify(initializer, times(1)).handleInputInitializerEvent(argumentCaptor.capture());
+    List<InputInitializerEvent> invokedEvents = argumentCaptor.getValue();
+    assertEquals(1, invokedEvents.size());
+
+    reset(initializer);
+
+    // 2nd attempt send event
+    TezTaskAttemptID srcTaskAttemptId12 = TezTaskAttemptID.getInstance(srcTaskId1, 2);
+    EventMetaData sourceInfo12 =
+        new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, srcVertexName,
null,
+            srcTaskAttemptId12);
+    InputInitializerEvent e2 = InputInitializerEvent.create("fakeVertex", "fakeInput", null);
+    TezEvent te2 = new TezEvent(e2, sourceInfo12);
+    eventList.add(te2);
+    initializerWrapper.handleInputInitializerEvents(eventList);
+
+    verify(initializer, never()).handleInputInitializerEvent(any(List.class));
+    eventList.clear();
+    reset(initializer);
+
+    // 2nd attempt succeeded
+    initializerWrapper.onTaskSucceeded(srcVertexName, srcTaskId1, srcTaskAttemptId12.getId());
+    verify(initializer, never()).handleInputInitializerEvent(argumentCaptor.capture());
+  }
+
+  // Order event1 success1, success2, event2
+  // Primarily a failure scenario, when a Task moves back to running from success
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testSuccessBeforeEvent() throws Exception {
+    InputDescriptor id = mock(InputDescriptor.class);
+    InputInitializerDescriptor iid = mock(InputInitializerDescriptor.class);
+    RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> rootInput =
+        new RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>("InputName",
id, iid);
+
+    InputInitializer initializer = mock(InputInitializer.class);
+    InputInitializerContext initializerContext = mock(InputInitializerContext.class);
+    Vertex vertex = mock(Vertex.class);
+    StateChangeNotifier stateChangeNotifier = mock(StateChangeNotifier.class);
+    AppContext appContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+
+    RootInputInitializerManager.InitializerWrapper initializerWrapper =
+        new RootInputInitializerManager.InitializerWrapper(rootInput, initializer,
+            initializerContext, vertex, stateChangeNotifier, appContext);
+
+    ApplicationId appId = ApplicationId.newInstance(1000, 1);
+    TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+    TezVertexID srcVertexId = TezVertexID.getInstance(dagId, 2);
+    TezTaskID srcTaskId1 = TezTaskID.getInstance(srcVertexId, 3);
+    Vertex srcVertex = mock(Vertex.class);
+    Task srcTask1 = mock(Task.class);
+    doReturn(TaskState.RUNNING).when(srcTask1).getState();
+    doReturn(srcTask1).when(srcVertex).getTask(srcTaskId1.getId());
+    when(appContext.getCurrentDAG().getVertex(any(String.class))).thenReturn(srcVertex);
+
+    String srcVertexName = "srcVertexName";
+    List<TezEvent> eventList = Lists.newLinkedList();
+
+
+    // First Attempt send event
+    TezTaskAttemptID srcTaskAttemptId11 = TezTaskAttemptID.getInstance(srcTaskId1, 1);
+    EventMetaData sourceInfo11 =
+        new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, srcVertexName,
null,
+            srcTaskAttemptId11);
+    InputInitializerEvent e1 = InputInitializerEvent.create("fakeVertex", "fakeInput", null);
+    TezEvent te1 = new TezEvent(e1, sourceInfo11);
+    eventList.add(te1);
+    initializerWrapper.handleInputInitializerEvents(eventList);
+
+    verify(initializer, never()).handleInputInitializerEvent(any(List.class));
+    eventList.clear();
+
+    // First attempt, Task success notification
+    initializerWrapper.onTaskSucceeded(srcVertexName, srcTaskId1, srcTaskAttemptId11.getId());
+    ArgumentCaptor<List> argumentCaptor = ArgumentCaptor.forClass(List.class);
+    verify(initializer, times(1)).handleInputInitializerEvent(argumentCaptor.capture());
+    List<InputInitializerEvent> invokedEvents = argumentCaptor.getValue();
+    assertEquals(1, invokedEvents.size());
+
+    reset(initializer);
+
+
+    TezTaskAttemptID srcTaskAttemptId12 = TezTaskAttemptID.getInstance(srcTaskId1, 2);
+    // 2nd attempt succeeded
+    initializerWrapper.onTaskSucceeded(srcVertexName, srcTaskId1, srcTaskAttemptId12.getId());
+    verify(initializer, never()).handleInputInitializerEvent(any(List.class));
+
+    // 2nd attempt send event
+    EventMetaData sourceInfo12 =
+        new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, srcVertexName,
null,
+            srcTaskAttemptId12);
+    InputInitializerEvent e2 = InputInitializerEvent.create("fakeVertex", "fakeInput", null);
+    TezEvent te2 = new TezEvent(e2, sourceInfo12);
+    eventList.add(te2);
+    initializerWrapper.handleInputInitializerEvents(eventList);
+
+    verify(initializer, never()).handleInputInitializerEvent(any(List.class));
+  }
+}


Mime
View raw message