Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9E57417A76 for ; Thu, 25 Sep 2014 21:53:59 +0000 (UTC) Received: (qmail 94066 invoked by uid 500); 25 Sep 2014 21:53:59 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 94031 invoked by uid 500); 25 Sep 2014 21:53:59 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 94022 invoked by uid 99); 25 Sep 2014 21:53:59 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Sep 2014 21:53:59 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 33F478809AE; Thu, 25 Sep 2014 21:53:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bikas@apache.org To: commits@tez.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: TEZ-1592. Vertex should wait for all initializers to finish before moving to INITED state (sseth via bikas) Date: Thu, 25 Sep 2014 21:53:59 +0000 (UTC) Repository: tez Updated Branches: refs/heads/master 51c8a8b71 -> b6790520d TEZ-1592. Vertex should wait for all initializers to finish before moving to INITED state (sseth via bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b6790520 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b6790520 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b6790520 Branch: refs/heads/master Commit: b6790520ddccb499e590efd3b567751e405a833a Parents: 51c8a8b Author: Bikas Saha Authored: Thu Sep 25 14:53:44 2014 -0700 Committer: Bikas Saha Committed: Thu Sep 25 14:53:44 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/tez/dag/app/dag/impl/VertexImpl.java | 70 +++++------- .../tez/dag/app/dag/impl/TestVertexImpl.java | 106 +++++++++++++++++++ 3 files changed, 137 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/b6790520/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d7fa961..f790290 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -28,6 +28,8 @@ ALL CHANGES: TEZ-1240. Add system test for propagation of diagnostics for errors TEZ-1618. LocalTaskSchedulerService.getTotalResources() and getAvailableResources() can get negative if JVM memory is larger than 2GB TEZ-1611. Change DataSource/Sink to be able to supply URIs for credentials + TEZ-1592. Vertex should wait for all initializers to finish before moving to + INITED state Release 0.5.1: Unreleased http://git-wip-us.apache.org/repos/asf/tez/blob/b6790520/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index ab22099..34fffd8 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -2670,25 +2670,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, + " to set #tasks for the vertex " + vertex.getVertexId()); if (vertex.inputsWithInitializers != null) { - // Use DAGScheduler to arbitrate resources among vertices later - vertex.rootInputInitializerManager = vertex.createRootInputInitializerManager( - vertex.getDAG().getName(), vertex.getName(), vertex.getVertexId(), - vertex.eventHandler, -1, - vertex.appContext.getTaskScheduler().getNumClusterNodes(), - vertex.getTaskResource(), - vertex.appContext.getTaskScheduler().getTotalResources()); - List> - inputList = Lists.newArrayListWithCapacity(vertex.inputsWithInitializers.size()); - for (String inputName : vertex.inputsWithInitializers) { - inputList.add(vertex.rootInputDescriptors.get(inputName)); - } - LOG.info("Vertex will initialize via inputInitializers " - + vertex.logIdentifier + ". Starting root input initializers: " - + vertex.inputsWithInitializers.size()); - vertex.rootInputInitializerManager.runInputInitializers(inputList); - // Send pending rootInputInitializerEvents - vertex.rootInputInitializerManager.handleInitializerEvents(vertex.pendingInitializerEvents); - vertex.pendingInitializerEvents.clear(); + vertex.setupInputInitializerManager(); return VertexState.INITIALIZING; } else { boolean hasOneToOneUninitedSource = false; @@ -2716,27 +2698,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } else { LOG.info("Creating " + vertex.numTasks + " for vertex: " + vertex.logIdentifier); vertex.createTasks(); + if (vertex.inputsWithInitializers != null) { - vertex.rootInputInitializerManager = vertex.createRootInputInitializerManager( - vertex.getDAG().getName(), vertex.getName(), vertex.getVertexId(), - vertex.eventHandler, vertex.getTotalTasks(), - vertex.appContext.getTaskScheduler().getNumClusterNodes(), - vertex.getTaskResource(), - vertex.appContext.getTaskScheduler().getTotalResources()); - List> - inputList = Lists.newArrayListWithCapacity(vertex.inputsWithInitializers.size()); - for (String inputName : vertex.inputsWithInitializers) { - inputList.add(vertex.rootInputDescriptors.get(inputName)); - } - LOG.info("Starting root input initializers: " - + vertex.inputsWithInitializers.size()); - // special case when numTasks>0 and still we want to stay in initializing - // state. This is handled in RootInputInitializedTransition specially. - vertex.initWaitsForRootInitializers = true; - vertex.rootInputInitializerManager.runInputInitializers(inputList); - // Send pending rootInputInitializerEvents - vertex.rootInputInitializerManager.handleInitializerEvents(vertex.pendingInitializerEvents); - vertex.pendingInitializerEvents.clear(); + vertex.setupInputInitializerManager(); return VertexState.INITIALIZING; } if (!vertex.uninitializedEdges.isEmpty()) { @@ -2832,8 +2796,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, // done. check if we need to do the initialization if (vertex.getState() == VertexState.INITIALIZING && vertex.initWaitsForRootInitializers) { - // set the wait flag to false - vertex.initWaitsForRootInitializers = false; + if (vertex.numInitializedInputs == vertex.inputsWithInitializers.size()) { + // set the wait flag to false if all initializers are done + vertex.initWaitsForRootInitializers = false; + } // initialize vertex if possible and needed if (vertex.canInitVertex()) { Preconditions.checkState(vertex.numTasks >= 0, @@ -3572,6 +3538,28 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } } + private void setupInputInitializerManager() { + rootInputInitializerManager = createRootInputInitializerManager( + getDAG().getName(), getName(), getVertexId(), + eventHandler, getTotalTasks(), + appContext.getTaskScheduler().getNumClusterNodes(), + getTaskResource(), + appContext.getTaskScheduler().getTotalResources()); + List> + inputList = Lists.newArrayListWithCapacity(inputsWithInitializers.size()); + for (String inputName : inputsWithInitializers) { + inputList.add(rootInputDescriptors.get(inputName)); + } + LOG.info("Vertex will initialize via inputInitializers " + + logIdentifier + ". Starting root input initializers: " + + inputsWithInitializers.size()); + initWaitsForRootInitializers = true; + rootInputInitializerManager.runInputInitializers(inputList); + // Send pending rootInputInitializerEvents + rootInputInitializerManager.handleInitializerEvents(pendingInitializerEvents); + pendingInitializerEvents.clear(); + } + private static class VertexStateChangedCallback implements OnStateChangedCallback { http://git-wip-us.apache.org/repos/asf/tez/blob/b6790520/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index a1b9847..e71acb6 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -496,6 +496,52 @@ public class TestVertexImpl { return dag; } + private DAGPlan createDAGPlanWithMultipleInitializers(String initializerClassName) { + LOG.info("Setting up dag plan with multiple input initializer"); + DAGPlan dag = DAGPlan.newBuilder() + .setName("testVertexWithMultipleInitializers") + .addVertex( + VertexPlan.newBuilder() + .setName("vertex1") + .setType(PlanVertexType.NORMAL) + .addInputs( + RootInputLeafOutputProto.newBuilder() + .setControllerDescriptor( + TezEntityDescriptorProto.newBuilder().setClassName( + initializerClassName)) + .setName("input1") + .setIODescriptor( + TezEntityDescriptorProto.newBuilder() + .setClassName("InputClazz") + .build() + ).build() + ) + .addInputs( + RootInputLeafOutputProto.newBuilder() + .setControllerDescriptor( + TezEntityDescriptorProto.newBuilder().setClassName( + initializerClassName)) + .setName("input2") + .setIODescriptor( + TezEntityDescriptorProto.newBuilder() + .setClassName("InputClazz") + .build() + ).build() + ) + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(-1) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("") + .setTaskModule("x1.y1") + .build() + ) + .build() + ).build(); + return dag; + } + private DAGPlan createDAGPlanWithInputInitializer(String initializerClassName) { LOG.info("Setting up dag plan with input initializer"); DAGPlan dag = DAGPlan.newBuilder() @@ -3969,6 +4015,60 @@ public class TestVertexImpl { return dag; } + @Test(timeout = 5000) + public void testVertexWithMultipleInitializers1() { + useCustomInitializer = true; + setupPreDagCreation(); + dagPlan = createDAGPlanWithMultipleInitializers("TestInputInitializer"); + setupPostDagCreation(); + + VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager) vertices + .get("vertex1"); + + dispatcher.getEventHandler().handle( + new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT)); + dispatcher.await(); + Assert.assertEquals(VertexState.INITIALIZING, v1.getState()); + + RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager(); + List v1Hints = createTaskLocationHints(5); + + // Complete initializer which sets parallelism first + initializerManager1.completeInputInitialization(0, 5, v1Hints); + Assert.assertEquals(VertexState.INITIALIZING, v1.getState()); + + // Complete second initializer + initializerManager1.completeInputInitialization(1); + Assert.assertEquals(VertexState.INITED, v1.getState()); + } + + @Test(timeout = 5000) + public void testVertexWithMultipleInitializers2() { + useCustomInitializer = true; + setupPreDagCreation(); + dagPlan = createDAGPlanWithMultipleInitializers("TestInputInitializer"); + setupPostDagCreation(); + + VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager) vertices + .get("vertex1"); + + dispatcher.getEventHandler().handle( + new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT)); + dispatcher.await(); + Assert.assertEquals(VertexState.INITIALIZING, v1.getState()); + + RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager(); + List v1Hints = createTaskLocationHints(5); + + // Complete initializer which does not set parallelism + initializerManager1.completeInputInitialization(1); + Assert.assertEquals(VertexState.INITIALIZING, v1.getState()); + + // Complete second initializer which sets parallelism + initializerManager1.completeInputInitialization(0, 5, v1Hints); + Assert.assertEquals(VertexState.INITED, v1.getState()); + } + @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testVertexWithInitializerSuccess() { @@ -4363,6 +4463,12 @@ public class TestVertexImpl { dispatcher.await(); } + public void completeInputInitialization(int initializerIndex) { + eventHandler.handle(new VertexEventRootInputInitialized(vertexID, inputs + .get(initializerIndex).getName(), null)); + dispatcher.await(); + } + public void completeInputInitialization(int initializerIndex, int targetTasks, List locationHints) { List events = Lists.newArrayListWithCapacity(targetTasks + 1);