Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 82865172D6 for ; Thu, 5 Feb 2015 21:00:46 +0000 (UTC) Received: (qmail 92018 invoked by uid 500); 5 Feb 2015 21:00:46 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 91983 invoked by uid 500); 5 Feb 2015 21:00:46 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 91766 invoked by uid 99); 5 Feb 2015 21:00:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Feb 2015 21:00:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2CA78DFDC9; Thu, 5 Feb 2015 21:00:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Date: Thu, 05 Feb 2015 21:00:51 -0000 Message-Id: <2b1942730f4242ff9c0b8f994d4eb320@git.apache.org> In-Reply-To: <95f9600993c243ebbcba5227a05620b4@git.apache.org> References: <95f9600993c243ebbcba5227a05620b4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [7/8] tez git commit: TEZ-2020. For 1-1 edge vertex configured event may be sent incorrectly (bikas) TEZ-2020. For 1-1 edge vertex configured event may be sent incorrectly (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b804b8f0 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b804b8f0 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b804b8f0 Branch: refs/heads/TEZ-2003 Commit: b804b8f03e9d7934202f3838841fe1abb416f480 Parents: 5cf9105 Author: Bikas Saha Authored: Wed Feb 4 19:48:19 2015 -0800 Committer: Bikas Saha Committed: Wed Feb 4 19:48:41 2015 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../event/VertexEventOneToOneSourceSplit.java | 50 --------- .../tez/dag/app/dag/event/VertexEventType.java | 1 - .../apache/tez/dag/app/dag/impl/VertexImpl.java | 111 +------------------ .../tez/dag/app/dag/impl/TestVertexImpl.java | 39 +++---- .../vertexmanager/InputReadyVertexManager.java | 80 ++++++++----- .../TestInputReadyVertexManager.java | 64 ++++++----- 7 files changed, 108 insertions(+), 238 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/b804b8f0/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6a494ca..2c54b4b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -176,6 +176,7 @@ TEZ-UI CHANGES (TEZ-8): Release 0.5.4: Unreleased ALL CHANGES: + TEZ-2020. For 1-1 edge vertex configured event may be sent incorrectly TEZ-2015. VertexImpl.doneReconfiguringVertex() should check other criteria before sending notification TEZ-2011. InputReadyVertexManager not resilient to updates in parallelism http://git-wip-us.apache.org/repos/asf/tez/blob/b804b8f0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventOneToOneSourceSplit.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventOneToOneSourceSplit.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventOneToOneSourceSplit.java deleted file mode 100644 index a7e580e..0000000 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventOneToOneSourceSplit.java +++ /dev/null @@ -1,50 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.tez.dag.app.dag.event; - -import org.apache.tez.dag.records.TezVertexID; - -public class VertexEventOneToOneSourceSplit extends VertexEvent { - final int numTasks; - final TezVertexID originalSplitVertex; - final TezVertexID senderVertex; - - public VertexEventOneToOneSourceSplit(TezVertexID vertexId, - TezVertexID senderVertex, - TezVertexID originalSplitVertex, - int numTasks) { - super(vertexId, VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT); - this.numTasks = numTasks; - this.senderVertex = senderVertex; - this.originalSplitVertex = originalSplitVertex; - } - - public int getNumTasks() { - return numTasks; - } - - public TezVertexID getOriginalSplitSource() { - return originalSplitVertex; - } - - public TezVertexID getSenderVertex() { - return senderVertex; - } - -} http://git-wip-us.apache.org/repos/asf/tez/blob/b804b8f0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java index 1d0222e..5eb4929 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java @@ -45,7 +45,6 @@ public enum VertexEventType { V_MANAGER_USER_CODE_ERROR, V_ROUTE_EVENT, - V_ONE_TO_ONE_SOURCE_SPLIT, //Producer: VertexInputInitializer V_ROOT_INPUT_INITIALIZED, http://git-wip-us.apache.org/repos/asf/tez/blob/b804b8f0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 9deccd2..865b182 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -116,7 +116,6 @@ import org.apache.tez.dag.app.dag.event.TaskEventType; import org.apache.tez.dag.app.dag.event.VertexEvent; import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError; import org.apache.tez.dag.app.dag.event.VertexEventNullEdgeInitialized; -import org.apache.tez.dag.app.dag.event.VertexEventOneToOneSourceSplit; import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex; import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed; import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized; @@ -343,10 +342,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, VertexEventType.V_ROOT_INPUT_INITIALIZED, new RootInputInitializedTransition()) .addTransition(VertexState.INITIALIZING, - EnumSet.of(VertexState.INITIALIZING), - VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT, - new OneToOneSourceSplitTransition()) - .addTransition(VertexState.INITIALIZING, EnumSet.of(VertexState.INITED, VertexState.FAILED), VertexEventType.V_READY_TO_INIT, new VertexInitializedTransition()) @@ -400,10 +395,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, new SourceVertexStartedTransition()) .addTransition(VertexState.INITED, EnumSet.of(VertexState.INITED), - VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT, - new OneToOneSourceSplitTransition()) - .addTransition(VertexState.INITED, - EnumSet.of(VertexState.INITED), VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION) .addTransition(VertexState.INITED, @@ -443,10 +434,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, VertexState.ERROR), VertexEventType.V_TASK_COMPLETED, new TaskCompletedTransition()) - .addTransition(VertexState.RUNNING, - EnumSet.of(VertexState.RUNNING), - VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT, - new OneToOneSourceSplitTransition()) .addTransition(VertexState.RUNNING, VertexState.TERMINATING, VertexEventType.V_TERMINATE, new VertexKilledTransition()) @@ -546,7 +533,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, VertexEventType.V_ROUTE_EVENT, VertexEventType.V_TASK_ATTEMPT_COMPLETED, VertexEventType.V_TASK_COMPLETED, - VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT, VertexEventType.V_ROOT_INPUT_INITIALIZED, VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, VertexEventType.V_NULL_EDGE_INITIALIZED, @@ -569,7 +555,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, VertexEventType.V_ROUTE_EVENT, VertexEventType.V_TASK_RESCHEDULED, VertexEventType.V_TASK_ATTEMPT_COMPLETED, - VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT, VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, VertexEventType.V_TASK_COMPLETED, VertexEventType.V_ROOT_INPUT_INITIALIZED, @@ -590,7 +575,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, VertexEventType.V_MANAGER_USER_CODE_ERROR, VertexEventType.V_TASK_COMPLETED, VertexEventType.V_TASK_ATTEMPT_COMPLETED, - VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT, VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, VertexEventType.V_TASK_RESCHEDULED, VertexEventType.V_INTERNAL_ERROR, @@ -692,7 +676,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, private final UserGroupInformation dagUgi; private boolean parallelismSet = false; - private TezVertexID originalOneToOneSplitSource = null; private AtomicBoolean committed = new AtomicBoolean(false); private AtomicBoolean aborted = new AtomicBoolean(false); @@ -1497,21 +1480,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } } - for (Map.Entry entry : targetVertices.entrySet()) { - Edge edge = entry.getValue(); - if (edge.getEdgeProperty().getDataMovementType() - == DataMovementType.ONE_TO_ONE) { - // inform these target vertices that we have changed parallelism - VertexEventOneToOneSourceSplit event = - new VertexEventOneToOneSourceSplit(entry.getKey().getVertexId(), - getVertexId(), - ((originalOneToOneSplitSource!=null) ? - originalOneToOneSplitSource : getVertexId()), - numTasks); - getEventHandler().handle(event); - } - } - } finally { writeLock.unlock(); } @@ -1532,21 +1500,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, @Override public void vertexReconfigurationPlanned() { - vertexReconfigurationPlanned(false); - } - - public void vertexReconfigurationPlanned(boolean testOverride) { writeLock.lock(); try { - if (testOverride) { - Preconditions.checkState(vmIsInitialized.get() && completelyConfiguredSent.get(), - "test should override only failed cases"); - } else { - Preconditions.checkState(!vmIsInitialized.get(), - "context.vertexReconfigurationPlanned() cannot be called after initialize()"); - Preconditions.checkState(!completelyConfiguredSent.get(), "vertexReconfigurationPlanned() " - + " cannot be invoked after the vertex has been configured."); - } + Preconditions.checkState(!vmIsInitialized.get(), + "context.vertexReconfigurationPlanned() cannot be called after initialize()"); + Preconditions.checkState(!completelyConfiguredSent.get(), "vertexReconfigurationPlanned() " + + " cannot be invoked after the vertex has been configured."); this.vertexToBeReconfiguredByManager = true; } finally { writeLock.unlock(); @@ -3147,68 +3106,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } } - public static class OneToOneSourceSplitTransition implements - MultipleArcTransition { - - @Override - public VertexState transition(VertexImpl vertex, VertexEvent event) { - VertexEventOneToOneSourceSplit splitEvent = - (VertexEventOneToOneSourceSplit)event; - TezVertexID originalSplitSource = splitEvent.getOriginalSplitSource(); - - if (vertex.originalOneToOneSplitSource != null) { - VertexState state = vertex.getState(); - Preconditions - .checkState( - (state == VertexState.INITIALIZING - || state == VertexState.INITED || state == VertexState.RUNNING), - " Unexpected 1-1 split for vertex " + vertex.getLogIdentifier() - + " in state " + vertex.getState() + " . Split in vertex " - + originalSplitSource + " sent by vertex " - + splitEvent.getSenderVertex() + " numTasks " - + splitEvent.getNumTasks()); - if (vertex.originalOneToOneSplitSource.equals(originalSplitSource)) { - // ignore another split event that may have come from a different - // path in the DAG. We have already split because of that source - LOG.info("Ignoring split of vertex " + vertex.getLogIdentifier() + - " because of split in vertex " + originalSplitSource + - " sent by vertex " + splitEvent.getSenderVertex() + - " numTasks " + splitEvent.getNumTasks()); - return state; - } - // cannot split from multiple sources - throw new TezUncheckedException("Vertex: " + vertex.getLogIdentifier() + - " asked to split by: " + originalSplitSource + - " but was already split by:" + vertex.originalOneToOneSplitSource); - } - - LOG.info("Splitting vertex " + vertex.getLogIdentifier() + - " because of split in vertex " + originalSplitSource + - " sent by vertex " + splitEvent.getSenderVertex() + - " numTasks " + splitEvent.getNumTasks()); - vertex.originalOneToOneSplitSource = originalSplitSource; - try { - vertex.setParallelism(splitEvent.getNumTasks(), null, null, null, false); - } catch (Exception e) { - // ingore this exception, should not happen - LOG.error("Unexpected exception, Just set Parallelims to a specified value, not involve EdgeManager," - + "exception should not happen here", e); - } - if (vertex.getState() == VertexState.RUNNING || - vertex.getState() == VertexState.INITED) { - return vertex.getState(); - } else { - Preconditions.checkState(vertex.getState() == VertexState.INITIALIZING, - " Unexpected 1-1 split for vertex " + vertex.getLogIdentifier() + - " in state " + vertex.getState() + - " . Split in vertex " + originalSplitSource + - " sent by vertex " + splitEvent.getSenderVertex() + - " numTasks " + splitEvent.getNumTasks()); - return vertex.getState(); - } - } - } - // Temporary to maintain topological order while starting vertices. Not useful // since there's not much difference between the INIT and RUNNING states. public static class SourceVertexStartedTransition implements http://git-wip-us.apache.org/repos/asf/tez/blob/b804b8f0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index e94bb17..83a3a8a 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -2447,14 +2447,14 @@ public class TestVertexImpl { @Test(timeout = 5000) public void testVertexSetParallelism() throws Exception { - initAllVertices(VertexState.INITED); VertexImpl v3 = vertices.get("vertex3"); + v3.vertexReconfigurationPlanned(); + initAllVertices(VertexState.INITED); Assert.assertEquals(2, v3.getTotalTasks()); Map tasks = v3.getTasks(); Assert.assertEquals(2, tasks.size()); TezTaskID firstTask = tasks.keySet().iterator().next(); - v3.vertexReconfigurationPlanned(true); VertexImpl v1 = vertices.get("vertex1"); startVertex(vertices.get("vertex2")); startVertex(v1); @@ -2477,13 +2477,13 @@ public class TestVertexImpl { @Test(timeout = 5000) public void testVertexSetParallelismIncreaseException() throws Exception { - initAllVertices(VertexState.INITED); VertexImpl v3 = vertices.get("vertex3"); + v3.vertexReconfigurationPlanned(); + initAllVertices(VertexState.INITED); Assert.assertEquals(2, v3.getTotalTasks()); Map tasks = v3.getTasks(); Assert.assertEquals(2, tasks.size()); - v3.vertexReconfigurationPlanned(true); VertexImpl v1 = vertices.get("vertex1"); startVertex(vertices.get("vertex2")); startVertex(v1); @@ -2500,13 +2500,13 @@ public class TestVertexImpl { @Test(timeout = 5000) public void testVertexSetParallelismMultipleException() throws Exception { - initAllVertices(VertexState.INITED); VertexImpl v3 = vertices.get("vertex3"); + v3.vertexReconfigurationPlanned(); + initAllVertices(VertexState.INITED); Assert.assertEquals(2, v3.getTotalTasks()); Map tasks = v3.getTasks(); Assert.assertEquals(2, tasks.size()); - v3.vertexReconfigurationPlanned(true); VertexImpl v1 = vertices.get("vertex1"); startVertex(vertices.get("vertex2")); startVertex(v1); @@ -2561,6 +2561,8 @@ public class TestVertexImpl { @Test(timeout = 5000) public void testSetCustomEdgeManager() throws Exception { + VertexImpl v5 = vertices.get("vertex5"); // Vertex5 linked to v3 (v3 src, v5 dest) + v5.vertexReconfigurationPlanned(); initAllVertices(VertexState.INITED); Edge edge = edges.get("e4"); EdgeManagerPlugin em = edge.getEdgeManager(); @@ -2574,10 +2576,7 @@ public class TestVertexImpl { edgeManagerDescriptor.setUserPayload(userPayload); Vertex v3 = vertices.get("vertex3"); - VertexImpl v5 = vertices.get("vertex5"); // Vertex5 linked to v3 (v3 src, v5 - // dest) - v5.vertexReconfigurationPlanned(true); Map edgeManagerDescriptors = Collections.singletonMap(v3.getName(), edgeManagerDescriptor); v5.setParallelism(v5.getTotalTasks() - 1, null, edgeManagerDescriptors, null, true); // Must decrease. @@ -3396,25 +3395,23 @@ public class TestVertexImpl { Assert.assertEquals(v1Hints.get(i), v1.getTaskLocationHints()[i]); } Assert.assertEquals(true, initializerManager1.hasShutDown); - + + startVertex(v1); + Assert.assertEquals(numTasks, vertices.get("vertex2").getTotalTasks()); - Assert.assertEquals(VertexState.INITED, vertices.get("vertex2").getState()); Assert.assertEquals(numTasks, vertices.get("vertex3").getTotalTasks()); - Assert.assertEquals(VertexState.INITED, vertices.get("vertex3").getState()); Assert.assertEquals(numTasks, vertices.get("vertex4").getTotalTasks()); - Assert.assertEquals(VertexState.INITED, vertices.get("vertex5").getState()); - // v5, v6 still initializing since edge is null - Assert.assertEquals(VertexState.INITIALIZING, vertices.get("vertex4").getState()); + // v4, v6 still initializing since edge is null Assert.assertEquals(VertexState.INITIALIZING, vertices.get("vertex4").getState()); + Assert.assertEquals(VertexState.INITIALIZING, vertices.get("vertex6").getState()); - startVertex(v1); Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex1").getState()); Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex2").getState()); Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex3").getState()); Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex5").getState()); - // v5, v6 still initializing since edge is null - Assert.assertEquals(VertexState.INITIALIZING, vertices.get("vertex4").getState()); + // v4, v6 still initializing since edge is null Assert.assertEquals(VertexState.INITIALIZING, vertices.get("vertex4").getState()); + Assert.assertEquals(VertexState.INITIALIZING, vertices.get("vertex6").getState()); mockEdgeManagerDescriptor = EdgeManagerPluginDescriptor.create(EdgeManagerForTest.class.getName()); @@ -3423,7 +3420,7 @@ public class TestVertexImpl { e.setCustomEdgeManager(mockEdgeManagerDescriptor); dispatcher.await(); Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState()); - Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState()); + Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex6").getState()); } @Test(timeout = 5000) @@ -3434,6 +3431,7 @@ public class TestVertexImpl { dagPlan = createDAGPlanForOneToOneSplit(null, numTasks, false); setupPostDagCreation(); VertexImpl v1 = vertices.get("vertex1"); + v1.vertexReconfigurationPlanned(); initAllVertices(VertexState.INITED); // fudge vertex manager so that tasks dont start running @@ -3441,7 +3439,6 @@ public class TestVertexImpl { VertexManagerPluginDescriptor.create(VertexManagerPluginForTest.class.getName()), v1, appContext, mock(StateChangeNotifier.class)); v1.vertexManager.initialize(); - v1.vertexReconfigurationPlanned(true); startVertex(v1); Assert.assertEquals(numTasks, vertices.get("vertex2").getTotalTasks()); @@ -3473,6 +3470,7 @@ public class TestVertexImpl { dagPlan = createDAGPlanForOneToOneSplit(null, numTasks, false); setupPostDagCreation(); VertexImpl v1 = vertices.get("vertex1"); + v1.vertexReconfigurationPlanned(); initAllVertices(VertexState.INITED); // fudge vertex manager so that tasks dont start running @@ -3486,7 +3484,6 @@ public class TestVertexImpl { Assert.assertEquals(numTasks, vertices.get("vertex4").getTotalTasks()); // change parallelism int newNumTasks = 3; - v1.vertexReconfigurationPlanned(true); v1.setParallelism(newNumTasks, null, null, null, true); v1.doneReconfiguringVertex(); dispatcher.await(); http://git-wip-us.apache.org/repos/asf/tez/blob/b804b8f0/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java index f5c187e..e2e9dd3 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.EnumSet; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -55,8 +56,10 @@ public class InputReadyVertexManager extends VertexManagerPlugin { int oneToOneSrcTasksDoneCount[]; TaskLocationHint oneToOneLocationHints[]; int numOneToOneEdges; - int numSignalsToWaitFor; + int numConfiguredSources; Multimap pendingCompletions = LinkedListMultimap.create(); + AtomicBoolean configured; + AtomicBoolean started; public InputReadyVertexManager(VertexManagerPluginContext context) { super(context); @@ -76,13 +79,10 @@ public class InputReadyVertexManager extends VertexManagerPlugin { } } - void start() { - if (!ready()) { - return; - } + private void configure() { + Preconditions.checkState(!configured.get(), "Vertex: " + getContext().getVertexName()); int numManagedTasks = getContext().getVertexNumTasks(getContext().getVertexName()); LOG.info("Managing " + numManagedTasks + " tasks for vertex: " + getContext().getVertexName()); - taskIsStarted = new boolean[numManagedTasks]; // find out about all input edge types. If there is a custom edge then // TODO Until TEZ-1013 we cannot handle custom input formats @@ -116,32 +116,51 @@ public class InputReadyVertexManager extends VertexManagerPlugin { } if (numOneToOneEdges > 0) { + Preconditions + .checkState(oneToOneSrcTaskCount >= 0, "Vertex: " + getContext().getVertexName()); if (oneToOneSrcTaskCount != numManagedTasks) { - throw new TezUncheckedException( - "Managed task number must equal 1-1 source task number"); + numManagedTasks = oneToOneSrcTaskCount; + // must change parallelism to make them the same + LOG.info("Update parallelism of vertex: " + getContext().getVertexName() + + " to " + oneToOneSrcTaskCount + " to match source 1-1 vertices."); + getContext().setVertexParallelism(oneToOneSrcTaskCount, null, null, null); } oneToOneSrcTasksDoneCount = new int[oneToOneSrcTaskCount]; oneToOneLocationHints = new TaskLocationHint[oneToOneSrcTaskCount]; } + + Preconditions.checkState(numManagedTasks >=0, "Vertex: " + getContext().getVertexName()); + taskIsStarted = new boolean[numManagedTasks]; - for (Map.Entry> entry : pendingCompletions.asMap().entrySet()) { - for (Integer task : entry.getValue()) { - handleSourceTaskFinished(entry.getKey(), task); - } - } + // allow scheduling + configured.set(true); + getContext().doneReconfiguringVertex(); + trySchedulingPendingCompletions(); } - boolean ready() { - int target = getContext().getInputVertexEdgeProperties().size() + 1; - Preconditions.checkState(numSignalsToWaitFor <= target); - return (numSignalsToWaitFor == target); + private boolean readyToSchedule() { + return (configured.get() && started.get()); + } + + private void trySchedulingPendingCompletions() { + if (readyToSchedule() && !pendingCompletions.isEmpty()) { + for (Map.Entry> entry : pendingCompletions.asMap().entrySet()) { + for (Integer i : entry.getValue()) { + onSourceTaskCompleted(entry.getKey(), i); + } + } + } } @Override public void initialize() { + // this will prevent vertex from starting until we notify we are done + getContext().vertexReconfigurationPlanned(); Map edges = getContext().getInputVertexEdgeProperties(); // wait for sources and self to start - numSignalsToWaitFor = 0; + numConfiguredSources = 0; + configured = new AtomicBoolean(false); + started = new AtomicBoolean(false); for (String entry : edges.keySet()) { getContext().registerForVertexStateUpdates(entry, EnumSet.of(VertexState.CONFIGURED)); } @@ -149,24 +168,33 @@ public class InputReadyVertexManager extends VertexManagerPlugin { @Override public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception { - numSignalsToWaitFor++; - LOG.info("Received configured signal from: " + stateUpdate.getVertexName() + - " numConfiguredSources: " + numSignalsToWaitFor); - start(); + numConfiguredSources++; + int target = getContext().getInputVertexEdgeProperties().size(); + LOG.info("For vertex: " + getContext().getVertexName() + "Received configured signal from: " + + stateUpdate.getVertexName() + " numConfiguredSources: " + numConfiguredSources + + " needed: " + target); + Preconditions.checkState(numConfiguredSources <= target, "Vertex: " + getContext().getVertexName()); + if (numConfiguredSources == target) { + configure(); + } } @Override public synchronized void onVertexStarted(Map> completions) { for (Map.Entry> entry : completions.entrySet()) { pendingCompletions.putAll(entry.getKey(), entry.getValue()); - } - numSignalsToWaitFor++; - start(); + } + + // allow scheduling + started.set(true); + + trySchedulingPendingCompletions(); } @Override public synchronized void onSourceTaskCompleted(String srcVertexName, Integer taskId) { - if (ready()) { + if (readyToSchedule()) { + // configured and started. try to schedule handleSourceTaskFinished(srcVertexName, taskId); } else { pendingCompletions.put(srcVertexName, taskId); http://git-wip-us.apache.org/repos/asf/tez/blob/b804b8f0/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java index 9a83a51..8de747d 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java @@ -31,6 +31,7 @@ import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.VertexManagerPluginContext; import org.apache.tez.dag.api.EdgeProperty.SchedulingType; import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint; @@ -82,8 +83,11 @@ public class TestInputReadyVertexManager { InputReadyVertexManager manager = new InputReadyVertexManager(mockContext); manager.initialize(); - // first source vertex configured + verify(mockContext, times(1)).vertexReconfigurationPlanned(); + // source vertex configured manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); + verify(mockContext, times(1)).doneReconfiguringVertex(); + verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture()); // then own vertex started manager.onVertexStarted(initialCompletions); manager.onSourceTaskCompleted(mockSrcVertexId1, 1); @@ -119,10 +123,12 @@ public class TestInputReadyVertexManager { InputReadyVertexManager manager = new InputReadyVertexManager(mockContext); manager.initialize(); - // first own vertex started - manager.onVertexStarted(initialCompletions); - // then source vertex configured + verify(mockContext, times(1)).vertexReconfigurationPlanned(); + // source vertex configured manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); + verify(mockContext, times(1)).doneReconfiguringVertex(); + verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture()); + manager.onVertexStarted(initialCompletions); verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture()); Assert.assertEquals(1, requestCaptor.getValue().size()); Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex().intValue()); @@ -174,17 +180,19 @@ public class TestInputReadyVertexManager { InputReadyVertexManager manager = new InputReadyVertexManager(mockContext); manager.initialize(); - // first own vertex started - manager.onVertexStarted(initialCompletions); + verify(mockContext, times(1)).vertexReconfigurationPlanned(); verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture()); + // ok to have source task complete before anything else manager.onSourceTaskCompleted(mockSrcVertexId1, 1); + // first own vertex started + manager.onVertexStarted(initialCompletions); + // no scheduling as we are not configured yet verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture()); // then source vertex configured. now we start manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); + verify(mockContext, times(1)).doneReconfiguringVertex(); + verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture()); - Assert.assertEquals(2, requestCaptor.getAllValues().size()); - Assert.assertEquals(1, requestCaptor.getValue().size()); - Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex().intValue()); manager.onSourceTaskCompleted(mockSrcVertexId1, 2); verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture()); Assert.assertEquals(1, requestCaptor.getValue().size()); @@ -247,58 +255,48 @@ public class TestInputReadyVertexManager { Map> initialCompletions = Maps.newHashMap(); - // 1-1 sources do not match managed tasks before vertex started + // 1-1 sources do not match managed tasks. setParallelism called to make them match when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4); InputReadyVertexManager manager = new InputReadyVertexManager(mockContext); manager.initialize(); + verify(mockContext, times(1)).vertexReconfigurationPlanned(); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); - try { - manager.onVertexStarted(initialCompletions); - Assert.assertTrue("Should have exception", false); - } catch (TezUncheckedException e) { - e.getMessage().contains("Managed task number must equal 1-1 source"); - } - - // 1-1 sources do not match managed tasks after vertex started - when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3); - manager = new InputReadyVertexManager(mockContext); - manager.initialize(); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); + verify(mockContext, times(1)).setVertexParallelism(3, null, null, null); + verify(mockContext, times(1)).doneReconfiguringVertex(); manager.onVertexStarted(initialCompletions); - when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4); - try { - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); - Assert.assertTrue("Should have exception", false); - } catch (TezUncheckedException e) { - e.getMessage().contains("Managed task number must equal 1-1 source"); - } // 1-1 sources do not match when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3); when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(4); manager = new InputReadyVertexManager(mockContext); manager.initialize(); + verify(mockContext, times(2)).vertexReconfigurationPlanned(); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); try { - manager.onVertexStarted(initialCompletions); + manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); Assert.assertTrue("Should have exception", false); } catch (TezUncheckedException e) { e.getMessage().contains("1-1 source vertices must have identical concurrency"); } + verify(mockContext, times(1)).setVertexParallelism(anyInt(), (VertexLocationHint) any(), + anyMap(), anyMap()); // not invoked + + when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(3); initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0)); initialCompletions.put(mockSrcVertexId2, Collections.singletonList(0)); - when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(3); manager = new InputReadyVertexManager(mockContext); manager.initialize(); + verify(mockContext, times(3)).vertexReconfigurationPlanned(); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); + verify(mockContext, times(1)).setVertexParallelism(anyInt(), (VertexLocationHint) any(), + anyMap(), anyMap()); // not invoked + verify(mockContext, times(2)).doneReconfiguringVertex(); manager.onVertexStarted(initialCompletions); // all 1-1 0's done but not scheduled because v1 is not done manager.onSourceTaskCompleted(mockSrcVertexId3, 0);