Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C3EC6200BE4 for ; Tue, 6 Dec 2016 18:06:38 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id C2A59160B2E; Tue, 6 Dec 2016 17:06:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id AF3D8160B2F for ; Tue, 6 Dec 2016 18:06:36 +0100 (CET) Received: (qmail 67287 invoked by uid 500); 6 Dec 2016 17:06:35 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 65712 invoked by uid 99); 6 Dec 2016 17:06:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Dec 2016 17:06:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7A6DCE9411; Tue, 6 Dec 2016 17:06:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jeagles@apache.org To: commits@tez.apache.org Date: Tue, 06 Dec 2016 17:07:12 -0000 Message-Id: In-Reply-To: <1e58e745d6be419abfba4578bc4073fe@git.apache.org> References: <1e58e745d6be419abfba4578bc4073fe@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [41/50] [abbrv] tez git commit: TEZ-3269. Provide basic fair routing and scheduling functionality via custom VertexManager and EdgeManager. archived-at: Tue, 06 Dec 2016 17:06:38 -0000 http://git-wip-us.apache.org/repos/asf/tez/blob/2c4ef9fe/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java index ad4cceb..b824d0b 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java @@ -19,291 +19,60 @@ package org.apache.tez.dag.library.vertexmanager; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.protobuf.ByteString; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.DataOutputBuffer; import org.apache.tez.common.ReflectionUtils; -import org.apache.tez.common.TezCommonUtils; -import org.apache.tez.common.TezUtils; -import org.apache.tez.dag.api.EdgeManagerPlugin; -import org.apache.tez.dag.api.EdgeManagerPluginContext; -import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; -import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.*; import org.apache.tez.dag.api.EdgeProperty.SchedulingType; -import org.apache.tez.dag.api.InputDescriptor; -import org.apache.tez.dag.api.OutputDescriptor; -import org.apache.tez.dag.api.TezUncheckedException; -import org.apache.tez.dag.api.UserPayload; -import org.apache.tez.dag.api.VertexLocationHint; -import org.apache.tez.dag.api.VertexManagerPluginContext; -import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest; -import org.apache.tez.dag.api.VertexManagerPluginDescriptor; import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.dag.api.event.VertexStateUpdate; -import org.apache.tez.dag.records.TaskAttemptIdentifierImpl; -import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.dag.records.TezTaskID; -import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.api.TaskAttemptIdentifier; -import org.apache.tez.runtime.api.TaskIdentifier; -import org.apache.tez.runtime.api.VertexIdentifier; -import org.apache.tez.runtime.api.VertexStatistics; -import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.VertexManagerEvent; -import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; -import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto; import org.junit.Assert; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -import org.roaringbitmap.RoaringBitmap; import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Map.Entry; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.anyList; import static org.mockito.Mockito.anyMap; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @SuppressWarnings({ "unchecked", "rawtypes" }) -@RunWith(Parameterized.class) -public class TestShuffleVertexManager { +public class TestShuffleVertexManager extends TestShuffleVertexManagerUtils { - TezVertexID vertexId = TezVertexID.fromString("vertex_1436907267600_195589_1_00"); - int taskId = 0; List emptyCompletions = null; - Class shuffleVertexManagerClass; - - @SuppressWarnings("deprecation") - @Parameterized.Parameters(name = "test[{0}]") - public static Collection data() { - Object[][] data = new Object[][]{ - {ShuffleVertexManager.class}}; - return Arrays.asList(data); - } - - public TestShuffleVertexManager( - Class shuffleVertexManagerClass) { - this.shuffleVertexManagerClass = shuffleVertexManagerClass; - } @Test(timeout = 5000) - public void testShuffleVertexManagerAutoParallelism() throws Exception { + public void testLargeDataSize() throws IOException { Configuration conf = new Configuration(); - ShuffleVertexManagerBase manager = null; + ShuffleVertexManagerBase manager; - HashMap mockInputVertices = - new HashMap(); - String mockSrcVertexId1 = "Vertex1"; - EdgeProperty eProp1 = EdgeProperty.create( - EdgeProperty.DataMovementType.SCATTER_GATHER, - EdgeProperty.DataSourceType.PERSISTED, - SchedulingType.SEQUENTIAL, - OutputDescriptor.create("out"), - InputDescriptor.create("in")); - String mockSrcVertexId2 = "Vertex2"; - EdgeProperty eProp2 = EdgeProperty.create( - EdgeProperty.DataMovementType.SCATTER_GATHER, - EdgeProperty.DataSourceType.PERSISTED, - SchedulingType.SEQUENTIAL, - OutputDescriptor.create("out"), - InputDescriptor.create("in")); - String mockSrcVertexId3 = "Vertex3"; - EdgeProperty eProp3 = EdgeProperty.create( - EdgeProperty.DataMovementType.BROADCAST, - EdgeProperty.DataSourceType.PERSISTED, - SchedulingType.SEQUENTIAL, - OutputDescriptor.create("out"), - InputDescriptor.create("in")); - + final String mockSrcVertexId1 = "Vertex1"; + final String mockSrcVertexId2 = "Vertex2"; + final String mockSrcVertexId3 = "Vertex3"; final String mockManagedVertexId = "Vertex4"; - - mockInputVertices.put(mockSrcVertexId1, eProp1); - mockInputVertices.put(mockSrcVertexId2, eProp2); - mockInputVertices.put(mockSrcVertexId3, eProp3); - - final VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class); - when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices); - when(mockContext.getVertexName()).thenReturn(mockManagedVertexId); - when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4); - - //Check via setters - ShuffleVertexManager.ShuffleVertexManagerConfigBuilder configurer = ShuffleVertexManager - .createConfigBuilder(null); - VertexManagerPluginDescriptor pluginDesc = configurer.setAutoReduceParallelism(true) - .setDesiredTaskInputSize(1000l) - .setMinTaskParallelism(10).setSlowStartMaxSrcCompletionFraction(0.5f).build(); - when(mockContext.getUserPayload()).thenReturn(pluginDesc.getUserPayload()); - - - manager = ReflectionUtils.createClazzInstance(pluginDesc.getClassName(), - new Class[]{VertexManagerPluginContext.class}, new Object[]{mockContext}); - manager.initialize(); - verify(mockContext, times(1)).vertexReconfigurationPlanned(); // Tez notified of reconfig - - Assert.assertTrue(manager.config.isAutoParallelismEnabled() == true); - Assert.assertTrue(manager.config.getDesiredTaskInputDataSize() == 1000l); - if (manager instanceof ShuffleVertexManager) { - Assert.assertTrue(((ShuffleVertexManager)manager).mgrConfig. - getMinTaskParallelism() == 10); - } - Assert.assertTrue(manager.config.getMinFraction() == 0.25f); - Assert.assertTrue(manager.config.getMaxFraction() == 0.5f); - - configurer = ShuffleVertexManager.createConfigBuilder(null); - pluginDesc = configurer.setAutoReduceParallelism(false).build(); - when(mockContext.getUserPayload()).thenReturn(pluginDesc.getUserPayload()); - manager = ReflectionUtils.createClazzInstance(pluginDesc.getClassName(), - new Class[]{VertexManagerPluginContext.class}, new Object[]{mockContext}); - manager.initialize(); - verify(mockContext, times(1)).vertexReconfigurationPlanned(); // Tez not notified of reconfig - - Assert.assertTrue(manager.config.isAutoParallelismEnabled() == false); - Assert.assertTrue(manager.config.getDesiredTaskInputDataSize() == - ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE_DEFAULT); - if (manager instanceof ShuffleVertexManager) { - Assert.assertTrue(((ShuffleVertexManager)manager).mgrConfig. - getMinTaskParallelism() == 1); - } - Assert.assertTrue(manager.config.getMinFraction() == - ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION_DEFAULT); - Assert.assertTrue(manager.config.getMaxFraction() == - ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION_DEFAULT); - - - final HashSet scheduledTasks = new HashSet(); - doAnswer(new Answer() { - public Object answer(InvocationOnMock invocation) { - Object[] args = invocation.getArguments(); - scheduledTasks.clear(); - List tasks = (List)args[0]; - for (ScheduleTaskRequest task : tasks) { - scheduledTasks.add(task.getTaskIndex()); - } - return null; - }}).when(mockContext).scheduleTasks(anyList()); - + final List scheduledTasks = Lists.newLinkedList(); final Map newEdgeManagers = new HashMap(); - - doAnswer(new Answer() { - public Object answer(InvocationOnMock invocation) throws Exception { - final int numTasks = ((Integer)invocation.getArguments()[0]).intValue(); - when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(numTasks); - newEdgeManagers.clear(); - for (Entry entry : - ((Map)invocation.getArguments()[2]).entrySet()) { - - EdgeManagerPluginDescriptor pluginDesc = entry.getValue().getEdgeManagerDescriptor(); - final UserPayload userPayload = pluginDesc.getUserPayload(); - EdgeManagerPluginContext emContext = new EdgeManagerPluginContext() { - @Override - public UserPayload getUserPayload() { - return userPayload == null ? null : userPayload; - } - - @Override - public String getSourceVertexName() { - return null; - } - - @Override - public String getDestinationVertexName() { - return null; - } - - @Override - public int getSourceVertexNumTasks() { - return 2; - } - - @Override - public int getDestinationVertexNumTasks() { - return numTasks; - } - }; - EdgeManagerPlugin edgeManager = ReflectionUtils - .createClazzInstance(pluginDesc.getClassName(), - new Class[]{EdgeManagerPluginContext.class}, new Object[]{emContext}); - edgeManager.initialize(); - newEdgeManagers.put(entry.getKey(), edgeManager); - } - return null; - }}).when(mockContext).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); - - // check initialization - manager = createManager(conf, mockContext, 0.1f, 0.1f); // Tez notified of reconfig - // source vertices have 0 tasks. - when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(0); - when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(0); - when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(1); - - manager.onVertexStarted(emptyCompletions); - verify(mockContext, times(2)).vertexReconfigurationPlanned(); - Assert.assertTrue(manager.bipartiteSources == 2); - - // check waiting for notification before scheduling - Assert.assertFalse(manager.pendingTasks.isEmpty()); - // source vertices have 0 tasks. triggers scheduling - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); - Assert.assertTrue(manager.pendingTasks.isEmpty()); - verify(mockContext, times(1)).reconfigureVertex(eq(1), any - (VertexLocationHint.class), anyMap()); - verify(mockContext, times(1)).doneReconfiguringVertex(); // reconfig done - Assert.assertTrue(scheduledTasks.size() == 1); // all tasks scheduled and parallelism changed - scheduledTasks.clear(); - // TODO TEZ-1714 locking verify(mockContext, times(1)).vertexManagerDone(); // notified after scheduling all tasks - - // check scheduling only after onVertexStarted - when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4); - manager = createManager(conf, mockContext, 0.1f, 0.1f); // Tez notified of reconfig - verify(mockContext, times(3)).vertexReconfigurationPlanned(); - // source vertices have 0 tasks. so only 1 notification needed. does not trigger scheduling - // normally this event will not come before onVertexStarted() is called - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); - verify(mockContext, times(1)).doneReconfiguringVertex(); // no change. will trigger after start - Assert.assertTrue(scheduledTasks.size() == 0); // no tasks scheduled - // trigger start and processing of pending notification events - manager.onVertexStarted(emptyCompletions); - Assert.assertTrue(manager.bipartiteSources == 2); - verify(mockContext, times(2)).reconfigureVertex(eq(1), any - (VertexLocationHint.class), anyMap()); - verify(mockContext, times(2)).doneReconfiguringVertex(); // reconfig done - Assert.assertTrue(manager.pendingTasks.isEmpty()); - Assert.assertTrue(scheduledTasks.size() == 1); // all tasks scheduled and parallelism changed - - - when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2); - when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2); - when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4); + final VertexManagerPluginContext mockContext = createVertexManagerContext( + mockSrcVertexId1, 2, mockSrcVertexId2, 2, mockSrcVertexId3, 2, + mockManagedVertexId, 4, scheduledTasks, newEdgeManagers); VertexManagerEvent vmEvent = getVertexManagerEvent(null, 5000L, mockSrcVertexId1); // parallelism not change due to large data size manager = createManager(conf, mockContext, 0.1f, 0.1f); - verify(mockContext, times(4)).vertexReconfigurationPlanned(); // Tez notified of reconfig + verify(mockContext, times(1)).vertexReconfigurationPlanned(); // Tez notified of reconfig manager.onVertexStarted(emptyCompletions); Assert.assertTrue(manager.pendingTasks.size() == 4); // no tasks scheduled manager.onVertexManagerEventReceived(vmEvent); @@ -312,210 +81,26 @@ public class TestShuffleVertexManager { manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); - verify(mockContext, times(2)).reconfigureVertex(anyInt(), any + verify(mockContext, times(0)).reconfigureVertex(anyInt(), any (VertexLocationHint.class), anyMap()); - verify(mockContext, times(2)).doneReconfiguringVertex(); + verify(mockContext, times(0)).doneReconfiguringVertex(); // trigger scheduling manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); - verify(mockContext, times(2)).reconfigureVertex(anyInt(), any + verify(mockContext, times(0)).reconfigureVertex(anyInt(), any (VertexLocationHint.class), anyMap()); - verify(mockContext, times(3)).doneReconfiguringVertex(); // reconfig done + verify(mockContext, times(1)).doneReconfiguringVertex(); // reconfig done Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled Assert.assertEquals(4, scheduledTasks.size()); // TODO TEZ-1714 locking verify(mockContext, times(2)).vertexManagerDone(); // notified after scheduling all tasks Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted); Assert.assertEquals(5000L, manager.completedSourceTasksOutputSize); - - /** - * Test vmEvent and vertexStatusUpdate before started - */ - scheduledTasks.clear(); - //{5,9,12,18} in bitmap - vmEvent = getVertexManagerEvent(null, 1L, "Vertex"); - - manager = createManager(conf, mockContext, 0.01f, 0.75f); - Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled - Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); - - TezTaskAttemptID taId1 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_0"); - vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", mockSrcVertexId1, taId1)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); - manager.onVertexManagerEventReceived(vmEvent); - Assert.assertEquals(0, manager.numVertexManagerEventsReceived); // nothing happens - manager.onVertexStarted(emptyCompletions); // now the processing happens - Assert.assertEquals(1, manager.numVertexManagerEventsReceived); - - /** - * Test partition stats - */ - scheduledTasks.clear(); - //{5,9,12,18} in bitmap - final long MB = 1024l * 1024l; - long[] sizes = new long[]{(0l), (1 * MB), (964 * MB), (48 * MB)}; - vmEvent = getVertexManagerEvent(sizes, 1L, "Vertex", false); - - manager = createManager(conf, mockContext, 0.01f, 0.75f); - manager.onVertexStarted(emptyCompletions); - Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled - Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); - - taId1 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_0"); - vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", mockSrcVertexId1, taId1)); - manager.onVertexManagerEventReceived(vmEvent); - Assert.assertEquals(1, manager.numVertexManagerEventsReceived); - - Assert.assertEquals(4, manager.stats.length); - Assert.assertEquals(0, manager.stats[0]); //0 MB bucket - Assert.assertEquals(1, manager.stats[1]); //1 MB bucket - Assert.assertEquals(100, manager.stats[2]); //100 MB bucket - Assert.assertEquals(10, manager.stats[3]); //10 MB bucket - - // sending again from a different version of the same task has not impact - TezTaskAttemptID taId2 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_1"); - vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", mockSrcVertexId1, taId2)); - manager.onVertexManagerEventReceived(vmEvent); - Assert.assertEquals(1, manager.numVertexManagerEventsReceived); - - Assert.assertEquals(4, manager.stats.length); - Assert.assertEquals(0, manager.stats[0]); //0 MB bucket - Assert.assertEquals(1, manager.stats[1]); //1 MB bucket - Assert.assertEquals(100, manager.stats[2]); //100 MB bucket - Assert.assertEquals(10, manager.stats[3]); //10 MB bucket - - // Testing for detailed partition stats - vmEvent = getVertexManagerEvent(sizes, 1L, "Vertex", true); - - manager = createManager(conf, mockContext, 0.01f, 0.75f); - manager.onVertexStarted(emptyCompletions); - Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled - Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); - - taId1 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_0"); - vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", mockSrcVertexId1, taId1)); - manager.onVertexManagerEventReceived(vmEvent); - Assert.assertEquals(1, manager.numVertexManagerEventsReceived); - - Assert.assertEquals(4, manager.stats.length); - Assert.assertEquals(0, manager.stats[0]); - Assert.assertEquals(1, manager.stats[1]); - Assert.assertEquals(964, manager.stats[2]); - Assert.assertEquals(48, manager.stats[3]); - - // sending again from a different version of the same task has not impact - taId2 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_1"); - vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", mockSrcVertexId1, taId2)); - manager.onVertexManagerEventReceived(vmEvent); - Assert.assertEquals(1, manager.numVertexManagerEventsReceived); - - Assert.assertEquals(4, manager.stats.length); - Assert.assertEquals(0, manager.stats[0]); - Assert.assertEquals(1, manager.stats[1]); - Assert.assertEquals(964, manager.stats[2]); - Assert.assertEquals(48, manager.stats[3]); - - /** - * Test for TEZ-978 - * Delay determining parallelism until enough data has been received. - */ - scheduledTasks.clear(); - - //min/max fraction of 0.01/0.75 would ensure that we hit determineParallelism code path on receiving first event itself. - manager = createManager(conf, mockContext, 0.01f, 0.75f); - manager.onVertexStarted(emptyCompletions); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); - Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled - Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks); - Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); - - //First task in src1 completed with small payload - vmEvent = getVertexManagerEvent(null, 1L, mockSrcVertexId1); - manager.onVertexManagerEventReceived(vmEvent); //small payload - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); - Assert.assertTrue(manager.determineParallelismAndApply(0f) == false); - Assert.assertEquals(4, manager.pendingTasks.size()); - Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled - Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted); - Assert.assertEquals(1, manager.numVertexManagerEventsReceived); - Assert.assertEquals(1L, manager.completedSourceTasksOutputSize); - - //First task in src2 completed with small payload - vmEvent = getVertexManagerEvent(null, 1L, mockSrcVertexId2); - manager.onVertexManagerEventReceived(vmEvent); //small payload - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); - //Still overall data gathered has not reached threshold; So, ensure parallelism can be determined later - Assert.assertTrue(manager.determineParallelismAndApply(0.25f) == false); - Assert.assertEquals(4, manager.pendingTasks.size()); - Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled - Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted); - Assert.assertEquals(2, manager.numVertexManagerEventsReceived); - Assert.assertEquals(2L, manager.completedSourceTasksOutputSize); - - //First task in src2 completed (with larger payload) to trigger determining parallelism - vmEvent = getVertexManagerEvent(null, 1200L, mockSrcVertexId2); - manager.onVertexManagerEventReceived(vmEvent); - Assert.assertTrue(manager.determineParallelismAndApply(0.25f)); //ensure parallelism is determined - verify(mockContext, times(3)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); - verify(mockContext, times(1)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap()); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); - Assert.assertEquals(0, manager.pendingTasks.size()); - Assert.assertEquals(2, scheduledTasks.size()); - Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted); - Assert.assertEquals(3, manager.numVertexManagerEventsReceived); - Assert.assertEquals(1202L, manager.completedSourceTasksOutputSize); - - //Test for max fraction. Min fraction is just instruction to framework, but honor max fraction - when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(20); - when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(20); - when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(40); - scheduledTasks.clear(); - - //min/max fraction of 0.0/0.2 - manager = createManager(conf, mockContext, 0.0f, 0.2f); - // initial invocation count == 3 - verify(mockContext, times(3)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); - manager.onVertexStarted(emptyCompletions); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); - Assert.assertEquals(40, manager.pendingTasks.size()); // no tasks scheduled - Assert.assertEquals(40, manager.totalNumBipartiteSourceTasks); - Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); - //send 7 events with payload size as 100 - for(int i=0;i<8;i++) { - //small payload - create new event each time or it will be ignored (from same task) - manager.onVertexManagerEventReceived(getVertexManagerEvent(null, 100L, mockSrcVertexId1)); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, i)); - //should not change parallelism - verify(mockContext, times(3)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); - } - for(int i=0;i<3;i++) { - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, i)); - verify(mockContext, times(3)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); - } - //ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks. Setting this for test - when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4); - //Since max threshold (40 * 0.2 = 8) is met, vertex manager should determine parallelism - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 8)); - // parallelism updated - verify(mockContext, times(4)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); - // check exact update value - 8 events with 100 each => 20 -> 2000 => 2 tasks (with 1000 per task) - verify(mockContext, times(2)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap()); - - //reset context for next test - when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2); - when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2); - when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4); - - // parallelism changed due to small data size scheduledTasks.clear(); // Ensure long overflow doesn't reduce mistakenly // Overflow can occur previously when output size * num tasks for a single vertex would over flow max long // - manager = createManager(conf, mockContext, 1.0f, 1.0f, (long)(Long.MAX_VALUE / 1.5)); + manager = createManager(conf, mockContext, true, (long)(Long.MAX_VALUE / 1.5), 1.0f, 1.0f); manager.onVertexStarted(emptyCompletions); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); @@ -557,8 +142,8 @@ public class TestShuffleVertexManager { manager.onVertexManagerEventReceived(vmEvent); manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1)); // Auto-reduce is triggered - verify(mockContext, times(5)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); - verify(mockContext, times(3)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap()); + verify(mockContext, times(1)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); + verify(mockContext, times(1)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap()); Assert.assertEquals(2, newEdgeManagers.size()); Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled Assert.assertEquals(2, scheduledTasks.size()); @@ -575,637 +160,91 @@ public class TestShuffleVertexManager { // parallelism changed due to small data size scheduledTasks.clear(); + } - manager = createManager(conf, mockContext, 0.5f, 0.5f); - manager.onVertexStarted(emptyCompletions); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); - Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled - Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks); - // task completion from non-bipartite stage does nothing - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0)); - Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled - Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks); - Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); - vmEvent = getVertexManagerEvent(null, 500L, mockSrcVertexId1); - manager.onVertexManagerEventReceived(vmEvent); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); - Assert.assertEquals(4, manager.pendingTasks.size()); - Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled - Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted); - Assert.assertEquals(1, manager.numVertexManagerEventsReceived); - Assert.assertEquals(500L, manager.completedSourceTasksOutputSize); - // ignore duplicate completion - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); - Assert.assertEquals(4, manager.pendingTasks.size()); - Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled - Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted); - Assert.assertEquals(500L, manager.completedSourceTasksOutputSize); - vmEvent = getVertexManagerEvent(null, 500L, mockSrcVertexId2); - manager.onVertexManagerEventReceived(vmEvent); - //ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks. Setting this for test - when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(2); + @Test(timeout = 5000) + public void testAutoParallelismConfig() throws Exception { + ShuffleVertexManager manager; - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1)); - // managedVertex tasks reduced - verify(mockContext, times(6)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); - verify(mockContext, times(4)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap()); - Assert.assertEquals(2, newEdgeManagers.size()); - // TODO improve tests for parallelism - Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled - Assert.assertEquals(2, scheduledTasks.size()); - Assert.assertTrue(scheduledTasks.contains(new Integer(0))); - Assert.assertTrue(scheduledTasks.contains(new Integer(1))); - Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted); - Assert.assertEquals(2, manager.numVertexManagerEventsReceived); - Assert.assertEquals(1000L, manager.completedSourceTasksOutputSize); - - // more completions dont cause recalculation of parallelism - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); - verify(mockContext, times(6)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); - Assert.assertEquals(2, newEdgeManagers.size()); - - EdgeManagerPlugin edgeManager = newEdgeManagers.values().iterator().next(); - Map> targets = Maps.newHashMap(); - DataMovementEvent dmEvent = DataMovementEvent.create(1, ByteBuffer.wrap(new byte[0])); - // 4 source task outputs - same as original number of partitions - Assert.assertEquals(4, edgeManager.getNumSourceTaskPhysicalOutputs(0)); - // 4 destination task inputs - 2 source tasks + 2 merged partitions - Assert.assertEquals(4, edgeManager.getNumDestinationTaskPhysicalInputs(0)); - edgeManager.routeDataMovementEventToDestination(dmEvent, 1, dmEvent.getSourceIndex(), targets); - Assert.assertEquals(1, targets.size()); - Map.Entry> e = targets.entrySet().iterator().next(); - Assert.assertEquals(0, e.getKey().intValue()); - Assert.assertEquals(1, e.getValue().size()); - Assert.assertEquals(3, e.getValue().get(0).intValue()); - targets.clear(); - dmEvent = DataMovementEvent.create(2, ByteBuffer.wrap(new byte[0])); - edgeManager.routeDataMovementEventToDestination(dmEvent, 0, dmEvent.getSourceIndex(), targets); - Assert.assertEquals(1, targets.size()); - e = targets.entrySet().iterator().next(); - Assert.assertEquals(1, e.getKey().intValue()); - Assert.assertEquals(1, e.getValue().size()); - Assert.assertEquals(0, e.getValue().get(0).intValue()); - targets.clear(); - edgeManager.routeInputSourceTaskFailedEventToDestination(2, targets); - Assert.assertEquals(2, targets.size()); - for (Map.Entry> entry : targets.entrySet()) { - Assert.assertTrue(entry.getKey().intValue() == 0 || entry.getKey().intValue() == 1); - Assert.assertEquals(2, entry.getValue().size()); - Assert.assertEquals(4, entry.getValue().get(0).intValue()); - Assert.assertEquals(5, entry.getValue().get(1).intValue()); - } + final List scheduledTasks = Lists.newLinkedList(); + + final VertexManagerPluginContext mockContext = createVertexManagerContext( + "Vertex1", 2, "Vertex2", 2, "Vertex3", 2, + "Vertex4", 4, scheduledTasks, null); + + //Check via setters + ShuffleVertexManager.ShuffleVertexManagerConfigBuilder configurer = ShuffleVertexManager + .createConfigBuilder(null); + VertexManagerPluginDescriptor pluginDesc = configurer.setAutoReduceParallelism(true) + .setDesiredTaskInputSize(1000l) + .setMinTaskParallelism(10).setSlowStartMaxSrcCompletionFraction(0.5f).build(); + when(mockContext.getUserPayload()).thenReturn(pluginDesc.getUserPayload()); + + + manager = ReflectionUtils.createClazzInstance(pluginDesc.getClassName(), + new Class[]{VertexManagerPluginContext.class}, new Object[]{mockContext}); + manager.initialize(); + verify(mockContext, times(1)).vertexReconfigurationPlanned(); // Tez notified of reconfig + + Assert.assertTrue(manager.config.isAutoParallelismEnabled()); + Assert.assertTrue(manager.config.getDesiredTaskInputDataSize() == 1000l); + Assert.assertTrue(manager.mgrConfig.getMinTaskParallelism() == 10); + Assert.assertTrue(manager.config.getMinFraction() == 0.25f); + Assert.assertTrue(manager.config.getMaxFraction() == 0.5f); + + configurer = ShuffleVertexManager.createConfigBuilder(null); + pluginDesc = configurer.setAutoReduceParallelism(false).build(); + when(mockContext.getUserPayload()).thenReturn(pluginDesc.getUserPayload()); + + manager = ReflectionUtils.createClazzInstance(pluginDesc.getClassName(), + new Class[]{VertexManagerPluginContext.class}, new Object[]{mockContext}); + manager.initialize(); + verify(mockContext, times(1)).vertexReconfigurationPlanned(); // Tez not notified of reconfig + + Assert.assertTrue(!manager.config.isAutoParallelismEnabled()); + Assert.assertTrue(manager.config.getDesiredTaskInputDataSize() == + ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE_DEFAULT); + Assert.assertTrue(manager.mgrConfig.getMinTaskParallelism() == 1); + Assert.assertTrue(manager.config.getMinFraction() == + ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION_DEFAULT); + Assert.assertTrue(manager.config.getMaxFraction() == + ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION_DEFAULT); } - + @Test(timeout = 5000) - public void testShuffleVertexManagerSlowStart() { + public void testSchedulingWithPartitionStats() throws IOException { Configuration conf = new Configuration(); - ShuffleVertexManagerBase manager = null; - HashMap mockInputVertices = - new HashMap(); - String mockSrcVertexId1 = "Vertex1"; + ShuffleVertexManagerBase manager; + + HashMap mockInputVertices = new HashMap(); + String r1 = "R1"; EdgeProperty eProp1 = EdgeProperty.create( EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create("out"), InputDescriptor.create("in")); - String mockSrcVertexId2 = "Vertex2"; + String m2 = "M2"; EdgeProperty eProp2 = EdgeProperty.create( - EdgeProperty.DataMovementType.SCATTER_GATHER, + EdgeProperty.DataMovementType.BROADCAST, EdgeProperty.DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create("out"), InputDescriptor.create("in")); - String mockSrcVertexId3 = "Vertex3"; + String m3 = "M3"; EdgeProperty eProp3 = EdgeProperty.create( EdgeProperty.DataMovementType.BROADCAST, EdgeProperty.DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create("out"), InputDescriptor.create("in")); - - String mockManagedVertexId = "Vertex4"; - - VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class); - when(mockContext.getVertexStatistics(any(String.class))).thenReturn(mock(VertexStatistics.class)); - when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices); - when(mockContext.getVertexName()).thenReturn(mockManagedVertexId); - when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3); - when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(1); - - // fail if there is no bipartite src vertex - mockInputVertices.put(mockSrcVertexId3, eProp3); - try { - manager = createManager(conf, mockContext, 0.1f, 0.1f); - manager.onVertexStarted(emptyCompletions); - Assert.assertFalse(true); - } catch (TezUncheckedException e) { - Assert.assertTrue(e.getMessage().contains( - "Atleast 1 bipartite source should exist")); - } - - mockInputVertices.put(mockSrcVertexId1, eProp1); - mockInputVertices.put(mockSrcVertexId2, eProp2); - - // check initialization - manager = createManager(conf, mockContext, 0.1f, 0.1f); - manager.onVertexStarted(emptyCompletions); - Assert.assertTrue(manager.bipartiteSources == 2); - - final HashSet scheduledTasks = new HashSet(); - doAnswer(new Answer() { - public Object answer(InvocationOnMock invocation) { - Object[] args = invocation.getArguments(); - scheduledTasks.clear(); - List tasks = (List)args[0]; - for (ScheduleTaskRequest task : tasks) { - scheduledTasks.add(task.getTaskIndex()); - } - return null; - }}).when(mockContext).scheduleTasks(anyList()); - - // source vertices have 0 tasks. immediate start of all managed tasks - when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(0); - when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(0); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); - manager.onVertexStarted(emptyCompletions); - Assert.assertTrue(manager.pendingTasks.isEmpty()); - Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled - - when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2); - when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2); - - try { - // source vertex have some tasks. min < 0. - manager = createManager(conf, mockContext, -0.1f, 0.0f); - Assert.assertTrue(false); // should not come here - } catch (IllegalArgumentException e) { - Assert.assertTrue(e.getMessage().contains( - "Invalid values for slowStartMinFraction")); - } - - try { - // source vertex have some tasks. max > 1. - manager = createManager(conf, mockContext, 0.0f, 95.0f); - Assert.assertTrue(false); // should not come here - } catch (IllegalArgumentException e) { - Assert.assertTrue(e.getMessage().contains( - "Invalid values for slowStartMinFraction")); - } - - try { - // source vertex have some tasks. min > max - manager = createManager(conf, mockContext, 0.5f, 0.3f); - Assert.assertTrue(false); // should not come here - } catch (IllegalArgumentException e) { - Assert.assertTrue(e.getMessage().contains( - "Invalid values for slowStartMinFraction")); - } - - // source vertex have some tasks. min > default and max undefined - int numTasks = 20; - when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(numTasks); - when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(numTasks); - scheduledTasks.clear(); - - manager = createManager(conf, mockContext, 0.8f, null); - manager.onVertexStarted(emptyCompletions); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); - - Assert.assertEquals(3, manager.pendingTasks.size()); - Assert.assertEquals(numTasks*2, manager.totalNumBipartiteSourceTasks); - Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); - float completedTasksThreshold = 0.8f * numTasks; - // Finish all tasks before exceeding the threshold - for (String mockSrcVertex : new String[] { mockSrcVertexId1, mockSrcVertexId2 }) { - for (int i = 0; i < mockContext.getVertexNumTasks(mockSrcVertex); ++i) { - // complete 0th tasks outside the loop - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertex, i+1)); - if ((i + 2) >= completedTasksThreshold) { - // stop before completing more than min/max source tasks - break; - } - } - } - // Since we haven't exceeded the threshold, all tasks are still pending - Assert.assertEquals(manager.totalTasksToSchedule, manager.pendingTasks.size()); - Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled - - // Cross the threshold min/max threshold to schedule all tasks - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); - Assert.assertEquals(3, manager.pendingTasks.size()); - Assert.assertEquals(0, scheduledTasks.size()); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); - Assert.assertEquals(0, manager.pendingTasks.size()); - Assert.assertEquals(manager.totalTasksToSchedule, scheduledTasks.size()); // all tasks scheduled - - // reset vertices for next test - when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2); - when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2); - - // source vertex have some tasks. min, max == 0 - manager = createManager(conf, mockContext, 0.0f, 0.0f); - manager.onVertexStarted(emptyCompletions); - Assert.assertTrue(manager.totalTasksToSchedule == 3); - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0); - // all source vertices need to be configured - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); - Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); - Assert.assertTrue(manager.pendingTasks.isEmpty()); - Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled - - // min, max > 0 and min == max - manager = createManager(conf, mockContext, 0.25f, 0.25f); - manager.onVertexStarted(emptyCompletions); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); - Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled - Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); - // task completion from non-bipartite stage does nothing - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0)); - Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled - Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0); - // task completion on only 1 SG edge does nothing - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); - Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled - Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); - Assert.assertTrue(manager.pendingTasks.isEmpty()); - Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2); - - // min, max > 0 and min == max == absolute max 1.0 - manager = createManager(conf, mockContext, 1.0f, 1.0f); - manager.onVertexStarted(emptyCompletions); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); - Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled - Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); - // task completion from non-bipartite stage does nothing - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0)); - Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled - Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); - Assert.assertTrue(manager.pendingTasks.size() == 3); - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1)); - Assert.assertTrue(manager.pendingTasks.size() == 3); - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); - Assert.assertTrue(manager.pendingTasks.size() == 3); - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1)); - Assert.assertTrue(manager.pendingTasks.isEmpty()); - Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4); - - // min, max > 0 and min == max - manager = createManager(conf, mockContext, 1.0f, 1.0f); - manager.onVertexStarted(emptyCompletions); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); - Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled - Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); - // task completion from non-bipartite stage does nothing - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0)); - Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled - Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); - Assert.assertTrue(manager.pendingTasks.size() == 3); - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1)); - Assert.assertTrue(manager.pendingTasks.size() == 3); - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); - Assert.assertTrue(manager.pendingTasks.size() == 3); - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1)); - Assert.assertTrue(manager.pendingTasks.isEmpty()); - Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4); - - // reset vertices for next test - when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(4); - when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(4); - - // min, max > and min < max - manager = createManager(conf, mockContext, 0.25f, 0.75f); - manager.onVertexStarted(emptyCompletions); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); - Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled - Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 8); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1)); - Assert.assertTrue(manager.pendingTasks.size() == 3); - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2); - // completion of same task again should not get counted - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1)); - Assert.assertTrue(manager.pendingTasks.size() == 3); - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1)); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); - Assert.assertTrue(manager.pendingTasks.size() == 1); - Assert.assertTrue(scheduledTasks.size() == 2); // 2 task scheduled - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 2)); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 2)); - Assert.assertTrue(manager.pendingTasks.size() == 0); - Assert.assertTrue(scheduledTasks.size() == 1); // 1 tasks scheduled - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 6); - scheduledTasks.clear(); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 3)); // we are done. no action - Assert.assertTrue(manager.pendingTasks.size() == 0); - Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 7); - - // min, max > and min < max - manager = createManager(conf, mockContext, 0.25f, 1.0f); - manager.onVertexStarted(emptyCompletions); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); - Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled - Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 8); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1)); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1)); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); - Assert.assertTrue(manager.pendingTasks.size() == 2); - Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 2)); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 2)); - Assert.assertTrue(manager.pendingTasks.size() == 1); - Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 6); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 3)); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 3)); - Assert.assertTrue(manager.pendingTasks.size() == 0); - Assert.assertTrue(scheduledTasks.size() == 1); // no task scheduled - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 8); - - // if there is single task to schedule, it should be schedule when src completed - // fraction is more than min slow start fraction - scheduledTasks.clear(); - when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(1); - manager = createManager(conf, mockContext, 0.25f, 0.75f); - manager.onVertexStarted(emptyCompletions); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); - Assert.assertTrue(manager.pendingTasks.size() == 1); // no tasks scheduled - Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 8); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1)); - Assert.assertTrue(manager.pendingTasks.size() == 1); - Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1)); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); - Assert.assertTrue(manager.pendingTasks.size() == 0); - Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4); - scheduledTasks.clear(); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 2)); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 2)); - Assert.assertTrue(manager.pendingTasks.size() == 0); - Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 6); - scheduledTasks.clear(); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 3)); // we are done. no action - Assert.assertTrue(manager.pendingTasks.size() == 0); - Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 7); - } - - - /** - * Tasks should be scheduled only when all source vertices are configured completely - * @throws IOException - */ - @Test(timeout = 5000) - public void test_Tez1649_with_scatter_gather_edges() throws IOException { - Configuration conf = new Configuration(); - ShuffleVertexManagerBase manager = null; - - HashMap mockInputVertices_R2 = new HashMap(); - String r1 = "R1"; - EdgeProperty eProp1 = EdgeProperty.create( - EdgeProperty.DataMovementType.SCATTER_GATHER, - EdgeProperty.DataSourceType.PERSISTED, - SchedulingType.SEQUENTIAL, - OutputDescriptor.create("out"), - InputDescriptor.create("in")); - String m2 = "M2"; - EdgeProperty eProp2 = EdgeProperty.create( - EdgeProperty.DataMovementType.SCATTER_GATHER, - EdgeProperty.DataSourceType.PERSISTED, - SchedulingType.SEQUENTIAL, - OutputDescriptor.create("out"), - InputDescriptor.create("in")); - String m3 = "M3"; - EdgeProperty eProp3 = EdgeProperty.create( - EdgeProperty.DataMovementType.SCATTER_GATHER, - EdgeProperty.DataSourceType.PERSISTED, - SchedulingType.SEQUENTIAL, - OutputDescriptor.create("out"), - InputDescriptor.create("in")); - - final String mockManagedVertexId_R2 = "R2"; - mockInputVertices_R2.put(r1, eProp1); - mockInputVertices_R2.put(m2, eProp2); - mockInputVertices_R2.put(m3, eProp3); - - final VertexManagerPluginContext mockContext_R2 = mock(VertexManagerPluginContext.class); - when(mockContext_R2.getInputVertexEdgeProperties()).thenReturn(mockInputVertices_R2); - when(mockContext_R2.getVertexName()).thenReturn(mockManagedVertexId_R2); - when(mockContext_R2.getVertexNumTasks(mockManagedVertexId_R2)).thenReturn(3); - when(mockContext_R2.getVertexNumTasks(r1)).thenReturn(3); - when(mockContext_R2.getVertexNumTasks(m2)).thenReturn(3); - when(mockContext_R2.getVertexNumTasks(m3)).thenReturn(3); - - VertexManagerEvent vmEvent = getVertexManagerEvent(null, 50L, r1); - // check initialization - manager = createManager(conf, mockContext_R2, 0.001f, 0.001f); - - final HashSet scheduledTasks = new HashSet(); - doAnswer(new Answer() { - public Object answer(InvocationOnMock invocation) { - Object[] args = invocation.getArguments(); - scheduledTasks.clear(); - List tasks = (List)args[0]; - for (ScheduleTaskRequest task : tasks) { - scheduledTasks.add(task.getTaskIndex()); - } - return null; - }}).when(mockContext_R2).scheduleTasks(anyList()); - - manager.onVertexStarted(emptyCompletions); - Assert.assertTrue(manager.bipartiteSources == 3); - manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED)); - - manager.onVertexManagerEventReceived(vmEvent); - Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled - Assert.assertEquals(6, manager.totalNumBipartiteSourceTasks); - Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); - - Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled - Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6); - - //Send events for all tasks of m3. - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0)); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 1)); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 2)); - - Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled - Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6); - - //Send events for m2. But still we need to wait for at least 1 event from r1. - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 0)); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 1)); - Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled - Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6); - - // we need to wait for at least 1 event from r1 to make sure all vertices cross min threshold - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0)); - Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled - Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6); - - //Ensure that setVertexParallelism is not called for R2. - verify(mockContext_R2, times(0)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), - anyMap()); - - //ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks. Setting this for test - when(mockContext_R2.getVertexNumTasks(mockManagedVertexId_R2)).thenReturn(1); - - // complete configuration of r1 triggers the scheduling - manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED)); - Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9); - verify(mockContext_R2, times(1)).reconfigureVertex(eq(1), any(VertexLocationHint.class), - anyMap()); - - Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled - Assert.assertTrue(scheduledTasks.size() == 1); - - //try with zero task vertices - scheduledTasks.clear(); - when(mockContext_R2.getInputVertexEdgeProperties()).thenReturn(mockInputVertices_R2); - when(mockContext_R2.getVertexName()).thenReturn(mockManagedVertexId_R2); - when(mockContext_R2.getVertexNumTasks(mockManagedVertexId_R2)).thenReturn(3); - when(mockContext_R2.getVertexNumTasks(r1)).thenReturn(0); - when(mockContext_R2.getVertexNumTasks(m2)).thenReturn(0); - when(mockContext_R2.getVertexNumTasks(m3)).thenReturn(3); - - manager = createManager(conf, mockContext_R2, 0.001f, 0.001f); - manager.onVertexStarted(emptyCompletions); - Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled - Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); - - // Only need completed configuration notification from m3 - manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED)); - Assert.assertEquals(3, manager.totalNumBipartiteSourceTasks); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0)); - Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled - Assert.assertTrue(scheduledTasks.size() == 3); - } - - VertexManagerEvent getVertexManagerEvent(long[] sizes, long totalSize, String vertexName) throws IOException { - return getVertexManagerEvent(sizes, totalSize, vertexName, false); - } - - VertexManagerEvent getVertexManagerEvent(long[] sizes, long totalSize, String vertexName, boolean reportDetailedStats) - throws IOException { - ByteBuffer payload = null; - if (sizes != null) { - RoaringBitmap partitionStats = ShuffleUtils.getPartitionStatsForPhysicalOutput(sizes); - DataOutputBuffer dout = new DataOutputBuffer(); - partitionStats.serialize(dout); - ByteString - partitionStatsBytes = TezCommonUtils.compressByteArrayToByteString(dout.getData()); - if (reportDetailedStats) { - payload = - VertexManagerEventPayloadProto.newBuilder() - .setOutputSize(totalSize) - .setDetailedPartitionStats(ShuffleUtils.getDetailedPartitionStatsForPhysicalOutput(sizes)) - .build().toByteString() - .asReadOnlyByteBuffer(); - } else { - payload = - VertexManagerEventPayloadProto.newBuilder() - .setOutputSize(totalSize) - .setPartitionStats(partitionStatsBytes) - .build().toByteString() - .asReadOnlyByteBuffer(); - } - - } else { - payload = - VertexManagerEventPayloadProto.newBuilder() - .setOutputSize(totalSize) - .build().toByteString() - .asReadOnlyByteBuffer(); - } - TaskAttemptIdentifierImpl taId = new TaskAttemptIdentifierImpl("dag", vertexName, - TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, taskId++), 0)); - VertexManagerEvent vmEvent = VertexManagerEvent.create(vertexName, payload); - vmEvent.setProducerAttemptIdentifier(taId); - return vmEvent; - } - - @Test(timeout = 5000) - public void testSchedulingWithPartitionStats() throws IOException { - Configuration conf = new Configuration(); - ShuffleVertexManagerBase manager = null; - - HashMap mockInputVertices = new HashMap(); - String r1 = "R1"; - EdgeProperty eProp1 = EdgeProperty.create( - EdgeProperty.DataMovementType.SCATTER_GATHER, - EdgeProperty.DataSourceType.PERSISTED, - SchedulingType.SEQUENTIAL, - OutputDescriptor.create("out"), - InputDescriptor.create("in")); - String m2 = "M2"; - EdgeProperty eProp2 = EdgeProperty.create( - EdgeProperty.DataMovementType.BROADCAST, - EdgeProperty.DataSourceType.PERSISTED, - SchedulingType.SEQUENTIAL, - OutputDescriptor.create("out"), - InputDescriptor.create("in")); - String m3 = "M3"; - EdgeProperty eProp3 = EdgeProperty.create( - EdgeProperty.DataMovementType.BROADCAST, - EdgeProperty.DataSourceType.PERSISTED, - SchedulingType.SEQUENTIAL, - OutputDescriptor.create("out"), - InputDescriptor.create("in")); - - final String mockManagedVertexId = "R2"; - - mockInputVertices.put(r1, eProp1); - mockInputVertices.put(m2, eProp2); - mockInputVertices.put(m3, eProp3); - + + final String mockManagedVertexId = "R2"; + + mockInputVertices.put(r1, eProp1); + mockInputVertices.put(m2, eProp2); + mockInputVertices.put(m3, eProp3); + VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class); when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices); when(mockContext.getVertexName()).thenReturn(mockManagedVertexId); @@ -1215,16 +254,8 @@ public class TestShuffleVertexManager { when(mockContext.getVertexNumTasks(m3)).thenReturn(3); final List scheduledTasks = Lists.newLinkedList(); - doAnswer(new Answer() { - public Object answer(InvocationOnMock invocation) { - Object[] args = invocation.getArguments(); - scheduledTasks.clear(); - List tasks = (List)args[0]; - for (ScheduleTaskRequest task : tasks) { - scheduledTasks.add(task.getTaskIndex()); - } - return null; - }}).when(mockContext).scheduleTasks(anyList()); + doAnswer(new ScheduledTasksAnswer(scheduledTasks)).when( + mockContext).scheduleTasks(anyList()); // check initialization manager = createManager(conf, mockContext, 0.001f, 0.001f); @@ -1270,283 +301,18 @@ public class TestShuffleVertexManager { Assert.assertTrue(scheduledTasks.get(2) == 1); } - @Test(timeout = 5000) - public void test_Tez1649_with_mixed_edges() { - Configuration conf = new Configuration(); - ShuffleVertexManagerBase manager = null; - - HashMap mockInputVertices = - new HashMap(); - String r1 = "R1"; - EdgeProperty eProp1 = EdgeProperty.create( - EdgeProperty.DataMovementType.SCATTER_GATHER, - EdgeProperty.DataSourceType.PERSISTED, - SchedulingType.SEQUENTIAL, - OutputDescriptor.create("out"), - InputDescriptor.create("in")); - String m2 = "M2"; - EdgeProperty eProp2 = EdgeProperty.create( - EdgeProperty.DataMovementType.BROADCAST, - EdgeProperty.DataSourceType.PERSISTED, - SchedulingType.SEQUENTIAL, - OutputDescriptor.create("out"), - InputDescriptor.create("in")); - String m3 = "M3"; - EdgeProperty eProp3 = EdgeProperty.create( - EdgeProperty.DataMovementType.BROADCAST, - EdgeProperty.DataSourceType.PERSISTED, - SchedulingType.SEQUENTIAL, - OutputDescriptor.create("out"), - InputDescriptor.create("in")); - - final String mockManagedVertexId = "R2"; - - mockInputVertices.put(r1, eProp1); - mockInputVertices.put(m2, eProp2); - mockInputVertices.put(m3, eProp3); - - VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class); - when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices); - when(mockContext.getVertexName()).thenReturn(mockManagedVertexId); - when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3); - when(mockContext.getVertexNumTasks(r1)).thenReturn(3); - when(mockContext.getVertexNumTasks(m2)).thenReturn(3); - when(mockContext.getVertexNumTasks(m3)).thenReturn(3); - - final HashSet scheduledTasks = new HashSet(); - doAnswer(new Answer() { - public Object answer(InvocationOnMock invocation) { - Object[] args = invocation.getArguments(); - scheduledTasks.clear(); - List tasks = (List)args[0]; - for (ScheduleTaskRequest task : tasks) { - scheduledTasks.add(task.getTaskIndex()); - } - return null; - }}).when(mockContext).scheduleTasks(anyList()); - - // check initialization - manager = createManager(conf, mockContext, 0.001f, 0.001f); - manager.onVertexStarted(emptyCompletions); - Assert.assertTrue(manager.bipartiteSources == 1); - - manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED)); - - Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled - Assert.assertEquals(3, manager.totalNumBipartiteSourceTasks); - Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); - - //Send events for 2 tasks of r1. - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0)); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 1)); - Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled - Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3); - - //Send an event for m2. - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 0)); - Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled - Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3); - - //Send an event for m3. - manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED)); - Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled - Assert.assertTrue(scheduledTasks.size() == 3); - - //Scenario when numBipartiteSourceTasksCompleted == totalNumBipartiteSourceTasks. - //Still, wait for a configuration to be completed from other edges - scheduledTasks.clear(); - manager = createManager(conf, mockContext, 0.001f, 0.001f); - manager.onVertexStarted(emptyCompletions); - manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED)); - - when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices); - when(mockContext.getVertexName()).thenReturn(mockManagedVertexId); - when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3); - when(mockContext.getVertexNumTasks(r1)).thenReturn(3); - when(mockContext.getVertexNumTasks(m2)).thenReturn(3); - when(mockContext.getVertexNumTasks(m3)).thenReturn(3); - Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled - Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3); - - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0)); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 1)); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 2)); - //Tasks from non-scatter edges of m2 and m3 are not complete. - Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled - manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED)); - //Got an event from other edges. Schedule all - Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled - Assert.assertTrue(scheduledTasks.size() == 3); - - - //try with a zero task vertex (with non-scatter-gather edges) - scheduledTasks.clear(); - manager = createManager(conf, mockContext, 0.001f, 0.001f); - manager.onVertexStarted(emptyCompletions); - when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices); - when(mockContext.getVertexName()).thenReturn(mockManagedVertexId); - when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3); - when(mockContext.getVertexNumTasks(r1)).thenReturn(3); //scatter gather - when(mockContext.getVertexNumTasks(m2)).thenReturn(0); //broadcast - when(mockContext.getVertexNumTasks(m3)).thenReturn(3); //broadcast - - manager = createManager(conf, mockContext, 0.001f, 0.001f); - manager.onVertexStarted(emptyCompletions); - manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED)); - - Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled - Assert.assertEquals(3, manager.totalNumBipartiteSourceTasks); - Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); - - //Send 2 events for tasks of r1. - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0)); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 1)); - Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled - Assert.assertTrue(scheduledTasks.size() == 0); - - // event from m3 triggers scheduling - manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED)); - Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled - Assert.assertTrue(scheduledTasks.size() == 3); - - //try with all zero task vertices in non-SG edges - scheduledTasks.clear(); - manager = createManager(conf, mockContext, 0.001f, 0.001f); - manager.onVertexStarted(emptyCompletions); - when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices); - when(mockContext.getVertexName()).thenReturn(mockManagedVertexId); - when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3); - when(mockContext.getVertexNumTasks(r1)).thenReturn(3); //scatter gather - when(mockContext.getVertexNumTasks(m2)).thenReturn(0); //broadcast - when(mockContext.getVertexNumTasks(m3)).thenReturn(0); //broadcast - - //Send 1 events for tasks of r1. - manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED)); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0)); - Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled - Assert.assertTrue(scheduledTasks.size() == 3); - } - - @Test - public void testZeroTasksSendsConfigured() throws IOException { - Configuration conf = new Configuration(); - ShuffleVertexManagerBase manager = null; - HashMap mockInputVertices = new HashMap(); - String r1 = "R1"; - EdgeProperty eProp1 = EdgeProperty.create( - EdgeProperty.DataMovementType.SCATTER_GATHER, - EdgeProperty.DataSourceType.PERSISTED, - SchedulingType.SEQUENTIAL, - OutputDescriptor.create("out"), - InputDescriptor.create("in")); - - final String mockManagedVertexId = "R2"; - mockInputVertices.put(r1, eProp1); - - final VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class); - when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices); - when(mockContext.getVertexName()).thenReturn(mockManagedVertexId); - when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(0); - - // check initialization - manager = createManager(conf, mockContext, 0.001f, 0.001f); - - final HashSet scheduledTasks = new HashSet(); - doAnswer(new Answer() { - public Object answer(InvocationOnMock invocation) { - Object[] args = invocation.getArguments(); - scheduledTasks.clear(); - List tasks = (List)args[0]; - for (ScheduleTaskRequest task : tasks) { - scheduledTasks.add(task.getTaskIndex()); - } - return null; - }}).when(mockContext).scheduleTasks(anyList()); - - manager.onVertexStarted(emptyCompletions); - manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED)); - Assert.assertEquals(1, manager.bipartiteSources); - Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); - Assert.assertEquals(0, manager.totalNumBipartiteSourceTasks); - Assert.assertEquals(0, manager.pendingTasks.size()); // no tasks scheduled - Assert.assertEquals(0, scheduledTasks.size()); - verify(mockContext).doneReconfiguringVertex(); - } - - public static TaskAttemptIdentifier createTaskAttemptIdentifier(String vName, int tId) { - VertexIdentifier mockVertex = mock(VertexIdentifier.class); - when(mockVertex.getName()).thenReturn(vName); - TaskIdentifier mockTask = mock(TaskIdentifier.class); - when(mockTask.getIdentifier()).thenReturn(tId); - when(mockTask.getVertexIdentifier()).thenReturn(mockVertex); - TaskAttemptIdentifier mockAttempt = mock(TaskAttemptIdentifier.class); - when(mockAttempt.getIdentifier()).thenReturn(0); - when(mockAttempt.getTaskIdentifier()).thenReturn(mockTask); - return mockAttempt; + private static ShuffleVertexManager createManager(Configuration conf, + VertexManagerPluginContext context, Float min, Float max) { + return createManager(conf, context, true, 1000l, min, max); } - private ShuffleVertexManagerBase createManager(Configuration conf, - VertexManagerPluginContext context, Float min, Float max, Long size) { - if (this.shuffleVertexManagerClass.equals(ShuffleVertexManager.class)) { - return createShuffleVertexManager(conf, context, min, max, size); - } else { - return null; - } - } - - private ShuffleVertexManagerBase createManager(Configuration conf, - VertexManagerPluginContext context, Float min, Float max) { - if (this.shuffleVertexManagerClass.equals(ShuffleVertexManager.class)) { - return createShuffleVertexManager(conf, context, min, max, null); - } else { - return null; - } - } - - private ShuffleVertexManager createShuffleVertexManager(Configuration conf, - VertexManagerPluginContext context, Float min, Float max, Long size) { - if (min != null) { - conf.setFloat( - ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, - min); - } else { - conf.unset( - ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION); - } - if (max != null) { - conf.setFloat( - ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, - max); - } else { - conf.unset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION); - } - conf.setBoolean( - ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, - true); - if (size != null) { - conf.setLong( - ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, - size); - } else { - conf.setLong( - ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, - 1000L); - } - UserPayload payload; - try { - payload = TezUtils.createUserPayloadFromConf(conf); - } catch (IOException e) { - throw new RuntimeException(e); - } - when(context.getUserPayload()).thenReturn(payload); - ShuffleVertexManager manager = new ShuffleVertexManager(context); - manager.initialize(); - return manager; + private static ShuffleVertexManager createManager(Configuration conf, + VertexManagerPluginContext context, + Boolean enableAutoParallelism, Long desiredTaskInputSize, Float min, + Float max) { + return (ShuffleVertexManager)TestShuffleVertexManagerBase.createManager( + ShuffleVertexManager.class, conf, context, + enableAutoParallelism, desiredTaskInputSize, min, max); } }