tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [42/50] [abbrv] git commit: TEZ-26. Create tests for Vertex Scheduler (bikas)
Date Tue, 04 Jun 2013 05:33:44 GMT
TEZ-26. Create tests for Vertex Scheduler (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/a3e2c2f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/a3e2c2f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/a3e2c2f5

Branch: refs/heads/master
Commit: a3e2c2f57af79dc6d5b84b07c699e3d96a64758c
Parents: d94d37a
Author: Bikas Saha <bikas@apache.org>
Authored: Sat Jun 1 00:00:06 2013 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Sat Jun 1 00:00:06 2013 -0700

----------------------------------------------------------------------
 .../impl/BipartiteSlowStartVertexScheduler.java    |    8 +-
 .../tez/dag/app/dag/impl/TestDAGScheduler.java     |    4 +-
 .../tez/dag/app/dag/impl/TestVertexScheduler.java  |  181 +++++++++++++++
 3 files changed, 187 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a3e2c2f5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BipartiteSlowStartVertexScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BipartiteSlowStartVertexScheduler.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BipartiteSlowStartVertexScheduler.java
index 6c7bb0f..0321c10 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BipartiteSlowStartVertexScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BipartiteSlowStartVertexScheduler.java
@@ -49,7 +49,6 @@ public class BipartiteSlowStartVertexScheduler implements VertexScheduler
{
   
   int numSourceTasks = 0;
   int numSourceTasksCompleted = 0;
-  boolean slowStartThresholdReached = false;
   ArrayList<TezTaskID> pendingTasks;
   int totalTasksToSchedule = 0;
   HashMap<TezVertexID, Vertex> bipartiteSources = 
@@ -84,7 +83,6 @@ public class BipartiteSlowStartVertexScheduler implements VertexScheduler
{
   
   @Override
   public void onVertexStarted() {
-    //targetVertex.scheduleTasks(targetVertex.getTasks().keySet()); 
     pendingTasks = new ArrayList<TezTaskID>(managedVertex.getTotalTasks());
     // track the tasks in this vertex
     pendingTasks.addAll(managedVertex.getTasks().keySet());
@@ -140,7 +138,7 @@ public class BipartiteSlowStartVertexScheduler implements VertexScheduler
{
     }
 
     float completedSourceTaskFraction = 0f;
-    if (numSourceTasks != 0) {//support for 0 source tasks
+    if (numSourceTasks != 0) { // support for 0 source tasks
       completedSourceTaskFraction = (float)numSourceTasksCompleted/numSourceTasks;
     } else {
       completedSourceTaskFraction = 1;
@@ -180,7 +178,9 @@ public class BipartiteSlowStartVertexScheduler implements VertexScheduler
{
                managedVertex.getVertexId() + " with totalTasks: " + 
                totalTasksToSchedule + ". " + numSourceTasksCompleted + 
                " source tasks completed out of " + numSourceTasks + 
-               ". SourceTaskCompletedFraction: " + completedSourceTaskFraction);
+               ". SourceTaskCompletedFraction: " + completedSourceTaskFraction + 
+               " min: " + slowStartMinSrcCompletionFraction + 
+               " max: " + slowStartMaxSrcCompletionFraction);
       schedulePendingTasks(numTasksToSchedule);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a3e2c2f5/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
index 988a116..36c31a4 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
@@ -45,7 +45,7 @@ public class TestDAGScheduler {
   
   MockEventHandler mockEventHandler = new MockEventHandler();
   
-  //@Test(timeout=10000)
+  @Test(timeout=10000)
   public void testDAGSchedulerNaturalOrder() {
     DAG mockDag = mock(DAG.class);
     Vertex mockVertex = mock(Vertex.class);
@@ -72,7 +72,7 @@ public class TestDAGScheduler {
     Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 5);
   }
   
-  @Test//(timeout=10000)
+  @Test(timeout=10000)
   public void testDAGSchedulerMRR() {
     DAG mockDag = mock(DAG.class);
     TezDAGID dagId = new TezDAGID("1", 1, 1);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a3e2c2f5/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
new file mode 100644
index 0000000..3ebb653
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Mockito.*;
+
+public class TestVertexScheduler {
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Test
+  public void testBipartiteSlowStartVertexScheduler() {
+    BipartiteSlowStartVertexScheduler scheduler = null;
+    TezDAGID dagId = new TezDAGID("1", 1, 1);
+    HashMap<Vertex, EdgeProperty> mockInputVertices = 
+        new HashMap<Vertex, EdgeProperty>();
+    Vertex mockSrcVertex1 = mock(Vertex.class);
+    TezVertexID mockSrcVertexId1 = new TezVertexID(dagId, 1);
+    EdgeProperty eProp1 = new EdgeProperty(
+        EdgeProperty.ConnectionPattern.BIPARTITE,
+        EdgeProperty.SourceType.STABLE, "in", "out");
+    when(mockSrcVertex1.getVertexId()).thenReturn(mockSrcVertexId1);
+    Vertex mockSrcVertex2 = mock(Vertex.class);
+    TezVertexID mockSrcVertexId2 = new TezVertexID(dagId, 2);
+    EdgeProperty eProp2 = new EdgeProperty(
+        EdgeProperty.ConnectionPattern.BIPARTITE,
+        EdgeProperty.SourceType.STABLE, "in", "out");
+    when(mockSrcVertex2.getVertexId()).thenReturn(mockSrcVertexId2);
+    Vertex mockSrcVertex3 = mock(Vertex.class);
+    TezVertexID mockSrcVertexId3 = new TezVertexID(dagId, 3);
+    EdgeProperty eProp3 = new EdgeProperty(
+        EdgeProperty.ConnectionPattern.ONE_TO_ALL,
+        EdgeProperty.SourceType.STABLE, "in", "out");
+    when(mockSrcVertex3.getVertexId()).thenReturn(mockSrcVertexId3);
+    
+    Vertex mockManagedVertex = mock(Vertex.class);
+    TezVertexID mockManagedVertexId = new TezVertexID(dagId, 3);
+    when(mockManagedVertex.getVertexId()).thenReturn(mockManagedVertexId);
+    when(mockManagedVertex.getInputVertices()).thenReturn(mockInputVertices);
+
+    // fail if there is no bipartite src vertex
+    mockInputVertices.put(mockSrcVertex3, eProp3);
+    try {
+     scheduler = new BipartiteSlowStartVertexScheduler(mockManagedVertex, 0, 0);
+     Assert.assertFalse(true);
+    } catch (TezException e) {
+      Assert.assertTrue(e.getMessage().contains(
+          "Atleast 1 bipartite source should exist"));
+    }
+    
+    mockInputVertices.put(mockSrcVertex1, eProp1);
+    mockInputVertices.put(mockSrcVertex2, eProp2);
+    
+    // check initialization
+    scheduler = 
+        new BipartiteSlowStartVertexScheduler(mockManagedVertex, 0.1f, 0.1f);
+    Assert.assertTrue(scheduler.bipartiteSources.size() == 2);
+    Assert.assertTrue(scheduler.bipartiteSources.containsKey(mockSrcVertexId1));
+    Assert.assertTrue(scheduler.bipartiteSources.containsKey(mockSrcVertexId2));
+    
+    HashMap<TezTaskID, Task> managedTasks = new HashMap<TezTaskID, Task>();
+    TezTaskID mockTaskId1 = new TezTaskID(mockManagedVertexId, 0);
+    managedTasks.put(mockTaskId1, null);
+    TezTaskID mockTaskId2 = new TezTaskID(mockManagedVertexId, 1);
+    managedTasks.put(mockTaskId2, null);
+    TezTaskID mockTaskId3 = new TezTaskID(mockManagedVertexId, 2);
+    managedTasks.put(mockTaskId3, null);
+    
+    when(mockManagedVertex.getTotalTasks()).thenReturn(3);
+    when(mockManagedVertex.getTasks()).thenReturn(managedTasks);
+    
+    final HashSet<TezTaskID> scheduledTasks = new HashSet<TezTaskID>();
+    doAnswer(new Answer() {
+      public Object answer(InvocationOnMock invocation) {
+          Object[] args = invocation.getArguments();
+          scheduledTasks.clear();
+          scheduledTasks.addAll((Collection<TezTaskID>)args[0]); 
+          return null;
+      }}).when(mockManagedVertex).scheduleTasks(anyCollection());
+    
+    // source vertices have 0 tasks. immediate start of all managed tasks
+    when(mockSrcVertex1.getTotalTasks()).thenReturn(0);
+    when(mockSrcVertex2.getTotalTasks()).thenReturn(0);
+    scheduler.onVertexStarted();
+    Assert.assertTrue(scheduler.pendingTasks.isEmpty());
+    Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
+    
+    when(mockSrcVertex1.getTotalTasks()).thenReturn(2);
+    when(mockSrcVertex2.getTotalTasks()).thenReturn(2);
+    
+    // source vertex have some tasks. min, max == 0
+    scheduler = new BipartiteSlowStartVertexScheduler(mockManagedVertex, 0, 0);
+    scheduler.onVertexStarted();
+    Assert.assertTrue(scheduler.numSourceTasks == 4);
+    Assert.assertTrue(scheduler.totalTasksToSchedule == 3);
+    Assert.assertTrue(scheduler.numSourceTasksCompleted == 0);
+    Assert.assertTrue(scheduler.pendingTasks.isEmpty());
+    Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
+    
+    TezTaskAttemptID mockSrcAttemptId11 = 
+        new TezTaskAttemptID(new TezTaskID(mockSrcVertexId1, 0), 0);
+    TezTaskAttemptID mockSrcAttemptId12 = 
+        new TezTaskAttemptID(new TezTaskID(mockSrcVertexId1, 0), 1);
+    TezTaskAttemptID mockSrcAttemptId21 = 
+        new TezTaskAttemptID(new TezTaskID(mockSrcVertexId2, 0), 0);
+    TezTaskAttemptID mockSrcAttemptId22 = 
+        new TezTaskAttemptID(new TezTaskID(mockSrcVertexId2, 0), 1);
+    TezTaskAttemptID mockSrcAttemptId31 = 
+        new TezTaskAttemptID(new TezTaskID(mockSrcVertexId3, 0), 0);
+    
+    // min, max > 0 and min == max
+    scheduler = 
+        new BipartiteSlowStartVertexScheduler(mockManagedVertex, 0.25f, 0.25f);
+    scheduler.onVertexStarted();
+    Assert.assertTrue(scheduler.pendingTasks.size() == 3); // no tasks scheduled
+    Assert.assertTrue(scheduler.numSourceTasks == 4);
+    // task completion from non-bipartite stage does nothing
+    scheduler.onSourceTaskCompleted(mockSrcAttemptId31);
+    Assert.assertTrue(scheduler.pendingTasks.size() == 3); // no tasks scheduled
+    Assert.assertTrue(scheduler.numSourceTasks == 4);
+    Assert.assertTrue(scheduler.numSourceTasksCompleted == 0);
+    scheduler.onSourceTaskCompleted(mockSrcAttemptId11);
+    Assert.assertTrue(scheduler.pendingTasks.isEmpty());
+    Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
+    Assert.assertTrue(scheduler.numSourceTasksCompleted == 1);
+    
+    // min, max > and min < max
+    scheduler = 
+        new BipartiteSlowStartVertexScheduler(mockManagedVertex, 0.25f, 0.75f);
+    scheduler.onVertexStarted();
+    Assert.assertTrue(scheduler.pendingTasks.size() == 3); // no tasks scheduled
+    Assert.assertTrue(scheduler.numSourceTasks == 4);
+    scheduler.onSourceTaskCompleted(mockSrcAttemptId11);
+    scheduler.onSourceTaskCompleted(mockSrcAttemptId12);
+    Assert.assertTrue(scheduler.pendingTasks.size() == 2);
+    Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
+    Assert.assertTrue(scheduler.numSourceTasksCompleted == 2);
+    scheduler.onSourceTaskCompleted(mockSrcAttemptId21);
+    Assert.assertTrue(scheduler.pendingTasks.size() == 0);
+    Assert.assertTrue(scheduledTasks.size() == 2); // 2 tasks scheduled
+    Assert.assertTrue(scheduler.numSourceTasksCompleted == 3);
+    scheduledTasks.clear();
+    scheduler.onSourceTaskCompleted(mockSrcAttemptId22); // we are done. no action
+    Assert.assertTrue(scheduler.pendingTasks.size() == 0);
+    Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled
+    Assert.assertTrue(scheduler.numSourceTasksCompleted == 4);
+
+  }
+}


Mime
View raw message