Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D99CF17DA9 for ; Fri, 27 Mar 2015 18:50:37 +0000 (UTC) Received: (qmail 69655 invoked by uid 500); 27 Mar 2015 18:50:37 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 69618 invoked by uid 500); 27 Mar 2015 18:50:37 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 69609 invoked by uid 99); 27 Mar 2015 18:50:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 27 Mar 2015 18:50:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8F3B9E1817; Fri, 27 Mar 2015 18:50:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bikas@apache.org To: commits@tez.apache.org Message-Id: <5474f7cc316346579002116883ff38fa@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-2242. Refactor ShuffleVertexManager code (bikas) (cherry picked from commit 505febd63e6c2fdaf882540d5c55f75fb30b7190) Date: Fri, 27 Mar 2015 18:50:37 +0000 (UTC) Repository: tez Updated Branches: refs/heads/branch-0.6 05bfdbec5 -> ff7883e06 TEZ-2242. Refactor ShuffleVertexManager code (bikas) (cherry picked from commit 505febd63e6c2fdaf882540d5c55f75fb30b7190) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ff7883e0 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ff7883e0 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ff7883e0 Branch: refs/heads/branch-0.6 Commit: ff7883e062928e550568af151e5bd14a9de2cb92 Parents: 05bfdbe Author: Bikas Saha Authored: Fri Mar 27 11:46:00 2015 -0700 Committer: Bikas Saha Committed: Fri Mar 27 11:48:31 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../vertexmanager/ShuffleVertexManager.java | 68 ++++++++++++++------ .../vertexmanager/TestShuffleVertexManager.java | 27 ++++---- 3 files changed, 65 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/ff7883e0/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c849b2e..ee8115f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.6.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2242. Refactor ShuffleVertexManager code TEZ-2205. Tez still tries to post to ATS when yarn.timeline-service.enabled=false. TEZ-2047. Build fails against hadoop-2.2 post TEZ-2018 TEZ-2064. SessionNotRunning Exception not thrown is all cases http://git-wip-us.apache.org/repos/asf/tez/blob/ff7883e0/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java index 05f94c5..a356829 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java @@ -141,6 +141,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { @VisibleForTesting int bipartiteSources = 0; long completedSourceTasksOutputSize = 0; + List pendingStateUpdates = Lists.newArrayList(); class SourceVertexInfo { EdgeProperty edgeProperty; @@ -327,7 +328,26 @@ public class ShuffleVertexManager extends VertexManagerPlugin { @Override - public void onVertexStarted(Map> completions) { + public synchronized void onVertexStarted(Map> completions) { + // examine edges after vertex started because until then these may not have been defined + Map inputs = getContext().getInputVertexEdgeProperties(); + for(Map.Entry entry : inputs.entrySet()) { + srcVertexInfo.put(entry.getKey(), new SourceVertexInfo(entry.getValue())); + // TODO what if derived class has already called this + getContext().registerForVertexStateUpdates(entry.getKey(), + EnumSet.of(VertexState.CONFIGURED)); + if (entry.getValue().getDataMovementType() == DataMovementType.SCATTER_GATHER) { + bipartiteSources++; + } + } + if(bipartiteSources == 0) { + throw new TezUncheckedException("Atleast 1 bipartite source should exist"); + } + for (VertexStateUpdate stateUpdate : pendingStateUpdates) { + handleVertexStateUpdate(stateUpdate); + } + pendingStateUpdates.clear(); + // track the tasks in this vertex updatePendingTasks(); updateSourceTaskCount(); @@ -349,7 +369,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { } @Override - public void onSourceTaskCompleted(String srcVertexName, Integer srcTaskId) { + public synchronized void onSourceTaskCompleted(String srcVertexName, Integer srcTaskId) { updateSourceTaskCount(); SourceVertexInfo srcInfo = srcVertexInfo.get(srcVertexName); @@ -369,7 +389,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { } @Override - public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) { + public synchronized void onVertexManagerEventReceived(VertexManagerEvent vmEvent) { // TODO handle duplicates from retries if (enableAutoParallelism) { // save output size @@ -679,19 +699,6 @@ public class ShuffleVertexManager extends VertexManagerPlugin { + " desiredTaskIput:" + desiredTaskInputDataSize + " minTasks:" + minTaskParallelism); - Map inputs = getContext().getInputVertexEdgeProperties(); - for(Map.Entry entry : inputs.entrySet()) { - srcVertexInfo.put(entry.getKey(), new SourceVertexInfo(entry.getValue())); - getContext().registerForVertexStateUpdates(entry.getKey(), - EnumSet.of(VertexState.CONFIGURED)); - if (entry.getValue().getDataMovementType() == DataMovementType.SCATTER_GATHER) { - bipartiteSources++; - } - } - if(bipartiteSources == 0) { - throw new TezUncheckedException("Atleast 1 bipartite source should exist"); - } - if (enableAutoParallelism) { getContext().vertexReconfigurationPlanned(); } @@ -700,8 +707,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { } - @Override - public void onVertexStateUpdated(VertexStateUpdate stateUpdate) { + private void handleVertexStateUpdate(VertexStateUpdate stateUpdate) { Preconditions.checkArgument(stateUpdate.getVertexState() == VertexState.CONFIGURED, "Received incorrect state notification : " + stateUpdate.getVertexState() + " for vertex: " + stateUpdate.getVertexName() + " in vertex: " + getContext().getVertexName()); @@ -717,7 +723,31 @@ public class ShuffleVertexManager extends VertexManagerPlugin { } @Override - public void onRootVertexInitialized(String inputName, + public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) { + if (stateUpdate.getVertexState() == VertexState.CONFIGURED) { + // we will not register for updates until our vertex starts. + // derived classes can make other update requests for other states that we should + // ignore. However that will not be allowed until the state change notified supports + // multiple registers for the same vertex + if (onVertexStartedDone.get()) { + // normally this if check will always be true because we register after vertex + // start. + handleVertexStateUpdate(stateUpdate); + } else { + // normally this code will not trigger since we are the ones who register for + // the configured states updates and that will happen after vertex starts. + // So this code will only trigger if a derived class also registers for updates + // for the same vertices but multiple registers to the same vertex is currently + // not supported by the state change notifier code. This is just future proofing + // when that is supported + // vertex not started yet. So edge info may not have been defined correctly yet. + pendingStateUpdates.add(stateUpdate); + } + } + } + + @Override + public synchronized void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, List events) { // Not allowing this for now. Nothing to do. } http://git-wip-us.apache.org/repos/asf/tez/blob/ff7883e0/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java index 76c0aa6..4d9302e 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java @@ -212,17 +212,16 @@ public class TestShuffleVertexManager { // check initialization manager = createManager(conf, mockContext, 0.1f, 0.1f); // Tez notified of reconfig - verify(mockContext, times(2)).vertexReconfigurationPlanned(); - Assert.assertTrue(manager.bipartiteSources == 2); - - // source vertices have 0 tasks. when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(0); when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(0); when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(1); - // check waiting for notification before scheduling manager.onVertexStarted(null); + verify(mockContext, times(2)).vertexReconfigurationPlanned(); + Assert.assertTrue(manager.bipartiteSources == 2); + + // check waiting for notification before scheduling Assert.assertFalse(manager.pendingTasks.isEmpty()); // source vertices have 0 tasks. so only 1 notification needed. triggers scheduling manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); @@ -235,12 +234,14 @@ public class TestShuffleVertexManager { // check scheduling only after onVertexStarted manager = createManager(conf, mockContext, 0.1f, 0.1f); // Tez notified of reconfig verify(mockContext, times(3)).vertexReconfigurationPlanned(); - Assert.assertTrue(manager.bipartiteSources == 2); // source vertices have 0 tasks. so only 1 notification needed. does not trigger scheduling + // normally this event will not come before onVertexStarted() is called manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); - verify(mockContext, times(1)).doneReconfiguringVertex(); // reconfig done + verify(mockContext, times(1)).doneReconfiguringVertex(); // no change. will trigger after start Assert.assertTrue(scheduledTasks.size() == 0); // no tasks scheduled + // trigger start and processing of pending notification events manager.onVertexStarted(null); + Assert.assertTrue(manager.bipartiteSources == 2); verify(mockContext, times(2)).doneReconfiguringVertex(); // reconfig done Assert.assertTrue(manager.pendingTasks.isEmpty()); Assert.assertTrue(scheduledTasks.size() == 4); // all tasks scheduled @@ -500,6 +501,7 @@ public class TestShuffleVertexManager { mockInputVertices.put(mockSrcVertexId3, eProp3); try { manager = createManager(conf, mockContext, 0.1f, 0.1f); + manager.onVertexStarted(null); Assert.assertFalse(true); } catch (TezUncheckedException e) { Assert.assertTrue(e.getMessage().contains( @@ -511,6 +513,7 @@ public class TestShuffleVertexManager { // check initialization manager = createManager(conf, mockContext, 0.1f, 0.1f); + manager.onVertexStarted(null); Assert.assertTrue(manager.bipartiteSources == 2); final HashSet scheduledTasks = new HashSet(); @@ -791,7 +794,6 @@ public class TestShuffleVertexManager { // check initialization manager = createManager(conf, mockContext_R2, 0.001f, 0.001f); - Assert.assertTrue(manager.bipartiteSources == 3); final HashSet scheduledTasks = new HashSet(); doAnswer(new Answer() { @@ -806,6 +808,7 @@ public class TestShuffleVertexManager { }}).when(mockContext_R2).scheduleVertexTasks(anyList()); manager.onVertexStarted(null); + Assert.assertTrue(manager.bipartiteSources == 3); manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED)); @@ -915,10 +918,6 @@ public class TestShuffleVertexManager { when(mockContext.getVertexNumTasks(m2)).thenReturn(3); when(mockContext.getVertexNumTasks(m3)).thenReturn(3); - // check initialization - manager = createManager(conf, mockContext, 0.001f, 0.001f); - Assert.assertTrue(manager.bipartiteSources == 1); - final HashSet scheduledTasks = new HashSet(); doAnswer(new Answer() { public Object answer(InvocationOnMock invocation) { @@ -931,7 +930,11 @@ public class TestShuffleVertexManager { return null; }}).when(mockContext).scheduleVertexTasks(anyList()); + // check initialization + manager = createManager(conf, mockContext, 0.001f, 0.001f); manager.onVertexStarted(null); + Assert.assertTrue(manager.bipartiteSources == 1); + manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));