Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A7BAE179A6 for ; Thu, 30 Oct 2014 01:12:45 +0000 (UTC) Received: (qmail 28018 invoked by uid 500); 30 Oct 2014 01:12:45 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 27976 invoked by uid 500); 30 Oct 2014 01:12:45 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 27967 invoked by uid 99); 30 Oct 2014 01:12:45 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Oct 2014 01:12:45 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 503AA91AD96; Thu, 30 Oct 2014 01:12:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bikas@apache.org To: commits@tez.apache.org Message-Id: <0868f0a171c34821ac89fcb73fcb9d23@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: TEZ-1699. Vertex.setParallelism should throw an exception for invalid invocations (bikas) Date: Thu, 30 Oct 2014 01:12:45 +0000 (UTC) Repository: tez Updated Branches: refs/heads/master 4e69bed5c -> 06e9f88eb TEZ-1699. Vertex.setParallelism should throw an exception for invalid invocations (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/06e9f88e Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/06e9f88e Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/06e9f88e Branch: refs/heads/master Commit: 06e9f88eb207f00ae2f31f46477001726754115c Parents: 4e69bed Author: Bikas Saha Authored: Wed Oct 29 18:12:41 2014 -0700 Committer: Bikas Saha Committed: Wed Oct 29 18:12:41 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 7 +++ .../tez/dag/api/VertexManagerPluginContext.java | 3 +- .../java/org/apache/tez/dag/app/dag/Vertex.java | 2 +- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 36 ++++++------- .../tez/dag/app/dag/impl/VertexManager.java | 4 +- .../tez/dag/app/dag/impl/TestDAGImpl.java | 12 ++--- .../tez/dag/app/dag/impl/TestVertexImpl.java | 56 +++++++++++++++----- 7 files changed, 78 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/06e9f88e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 933a445..7bc96ce 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -14,6 +14,8 @@ INCOMPATIBLE CHANGES TEZ-1666. UserPayload should be null if the payload is not specified. 0.5.1 client cannot talk to 0.5.2 AMs (TEZ-1666 and TEZ-1664). context.getUserPayload can now return null, apps may need to add defensive code. + TEZ-1699. Vertex.setParallelism should throw an exception for invalid + invocations ALL CHANGES: TEZ-1620. Wait for application finish before stopping MiniTezCluster @@ -78,6 +80,11 @@ ALL CHANGES: TEZ-1701. ATS fixes to flush all history events and also using batching. TEZ-792. Default staging path should have user name. TEZ-1689. addendum - fix unit test failure. + TEZ-1666. UserPayload should be null if the payload is not specified. + 0.5.1 client cannot talk to 0.5.2 AMs (TEZ-1666 and TEZ-1664). + context.getUserPayload can now return null, apps may need to add defensive code. + TEZ-1699. Vertex.setParallelism should throw an exception for invalid + invocations Release 0.5.1: 2014-10-02 http://git-wip-us.apache.org/repos/asf/tez/blob/06e9f88e/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java index 5bd8768..c1f4bcd 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java @@ -123,9 +123,8 @@ public interface VertexManagerPluginContext { * @param sourceEdgeManagers Edge Managers to be updated * @param rootInputSpecUpdate Updated Root Input specifications, if any. * If none specified, a default of 1 physical input is used - * @return true if the operation was allowed. */ - public boolean setVertexParallelism(int parallelism, + public void setVertexParallelism(int parallelism, @Nullable VertexLocationHint locationHint, @Nullable Map sourceEdgeManagers, @Nullable Map rootInputSpecUpdate); http://git-wip-us.apache.org/repos/asf/tez/blob/06e9f88e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java index cefd34d..fa1f2c4 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java @@ -87,7 +87,7 @@ public interface Vertex extends Comparable { @Nullable TaskLocationHint getTaskLocationHint(TezTaskID taskID); - boolean setParallelism(int parallelism, VertexLocationHint vertexLocationHint, + void setParallelism(int parallelism, VertexLocationHint vertexLocationHint, Map sourceEdgeManagers, Map rootInputSpecUpdate) throws AMUserCodeException; void setVertexLocationHint(VertexLocationHint vertexLocationHint); http://git-wip-us.apache.org/repos/asf/tez/blob/06e9f88e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index b8e99d4..5c76a77 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -1198,14 +1198,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } @Override - public boolean setParallelism(int parallelism, VertexLocationHint vertexLocationHint, + public void setParallelism(int parallelism, VertexLocationHint vertexLocationHint, Map sourceEdgeManagers, Map rootInputSpecUpdates) throws AMUserCodeException { - return setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers, rootInputSpecUpdates, - false); + setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers, rootInputSpecUpdates, false); } - private boolean setParallelism(int parallelism, VertexLocationHint vertexLocationHint, + private void setParallelism(int parallelism, VertexLocationHint vertexLocationHint, Map sourceEdgeManagers, Map rootInputSpecUpdates, boolean recovering) throws AMUserCodeException { @@ -1235,7 +1234,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, LOG.info("Got updated RootInputsSpecs during recovery: " + rootInputSpecUpdates.toString()); this.rootInputSpecs.putAll(rootInputSpecUpdates); } - return true; + return; } finally { writeLock.unlock(); } @@ -1246,8 +1245,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, writeLock.lock(); try { if (parallelismSet == true) { - LOG.info("Parallelism can only be set dynamically once per vertex: " + logIdentifier); - return false; + String msg = "Parallelism can only be set dynamically once per vertex: " + logIdentifier; + LOG.info(msg); + throw new TezUncheckedException(msg); } parallelismSet = true; @@ -1311,9 +1311,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, if (parallelism >= numTasks) { // not that hard to support perhaps. but checking right now since there // is no use case for it and checking may catch other bugs. - LOG.warn("Increasing parallelism is not supported, vertexId=" - + logIdentifier); - return false; + String msg = "Increasing parallelism is not supported, vertexId=" + logIdentifier; + LOG.warn(msg); + throw new TezUncheckedException(msg); } if (parallelism == numTasks) { LOG.info("setParallelism same as current value: " + parallelism + @@ -1342,10 +1342,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, Map.Entry entry = iter.next(); Task task = entry.getValue(); if (task.getState() != TaskState.NEW) { - LOG.warn( - "All tasks must be in initial state when changing parallelism" - + " for vertex: " + getVertexId() + " name: " + getName()); - return false; + String msg = "All tasks must be in initial state when changing parallelism" + + " for vertex: " + getVertexId() + " name: " + getName(); + LOG.warn(msg); + throw new TezUncheckedException(msg); } if (i <= parallelism) { continue; @@ -1407,8 +1407,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } finally { writeLock.unlock(); } - - return true; } public void setVertexLocationHint(VertexLocationHint vertexLocationHint) { @@ -2532,8 +2530,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } boolean successSetParallelism ; try { - successSetParallelism = vertex.setParallelism(0, + vertex.setParallelism(0, null, vertex.recoveredSourceEdgeManagers, vertex.recoveredRootInputSpecUpdates, true); + successSetParallelism = true; } catch (Exception e) { successSetParallelism = false; } @@ -2589,8 +2588,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, break; } try { - successSetParallelism = vertex.setParallelism(0, null, vertex.recoveredSourceEdgeManagers, + vertex.setParallelism(0, null, vertex.recoveredSourceEdgeManagers, vertex.recoveredRootInputSpecUpdates, true); + successSetParallelism = true; } catch (Exception e) { successSetParallelism = false; } http://git-wip-us.apache.org/repos/asf/tez/blob/06e9f88e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java index c5fe41f..1bfb0f9 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java @@ -108,11 +108,11 @@ public class VertexManager { } @Override - public boolean setVertexParallelism(int parallelism, VertexLocationHint vertexLocationHint, + public void setVertexParallelism(int parallelism, VertexLocationHint vertexLocationHint, Map sourceEdgeManagers, Map rootInputSpecUpdate) { try { - return managedVertex.setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers, + managedVertex.setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers, rootInputSpecUpdate); } catch (AMUserCodeException e) { // workaround: convert it to TezUncheckedException which would be caught in VM http://git-wip-us.apache.org/repos/asf/tez/blob/06e9f88e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java index af48c49..e79eeef 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java @@ -889,7 +889,7 @@ public class TestDAGImpl { } @SuppressWarnings("unchecked") - @Test() + @Test(timeout = 5000) public void testEdgeManager_GetNumDestinationTaskPhysicalInputs() { setupDAGWithCustomEdge(ExceptionLocation.GetNumDestinationTaskPhysicalInputs); dispatcher.getEventHandler().handle( @@ -912,7 +912,7 @@ public class TestDAGImpl { } @SuppressWarnings("unchecked") - @Test() + @Test(timeout = 5000) public void testEdgeManager_GetNumSourceTaskPhysicalOutputs() { setupDAGWithCustomEdge(ExceptionLocation.GetNumSourceTaskPhysicalOutputs); dispatcher.getEventHandler().handle( @@ -932,7 +932,7 @@ public class TestDAGImpl { } @SuppressWarnings("unchecked") - @Test() + @Test(timeout = 5000) public void testEdgeManager_RouteDataMovementEventToDestination() { setupDAGWithCustomEdge(ExceptionLocation.RouteDataMovementEventToDestination); dispatcher.getEventHandler().handle( @@ -962,7 +962,7 @@ public class TestDAGImpl { } @SuppressWarnings("unchecked") - @Test() + @Test(timeout = 5000) public void testEdgeManager_RouteInputSourceTaskFailedEventToDestination() { setupDAGWithCustomEdge(ExceptionLocation.RouteInputSourceTaskFailedEventToDestination); dispatcher.getEventHandler().handle( @@ -992,7 +992,7 @@ public class TestDAGImpl { } @SuppressWarnings("unchecked") - @Test() + @Test(timeout = 5000) public void testEdgeManager_GetNumDestinationConsumerTasks() { setupDAGWithCustomEdge(ExceptionLocation.GetNumDestinationConsumerTasks); dispatcher.getEventHandler().handle( @@ -1023,7 +1023,7 @@ public class TestDAGImpl { } @SuppressWarnings("unchecked") - @Test() + @Test(timeout = 5000) public void testEdgeManager_RouteInputErrorEventToSource() { setupDAGWithCustomEdge(ExceptionLocation.RouteInputErrorEventToSource); dispatcher.getEventHandler().handle( http://git-wip-us.apache.org/repos/asf/tez/blob/06e9f88e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 55ee05f..fdf0e07 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -72,6 +72,7 @@ import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.RootInputLeafOutput; import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; @@ -2348,7 +2349,7 @@ public class TestVertexImpl { Map edgeManagerDescriptors = Collections.singletonMap( v1.getName(), mockEdgeManagerDescriptor); - assertTrue(v3.setParallelism(1, null, edgeManagerDescriptors, null)); + v3.setParallelism(1, null, edgeManagerDescriptors, null); assertTrue(v3.sourceVertices.get(v1).getEdgeManager() instanceof EdgeManagerForTest); Assert.assertEquals(1, v3.getTotalTasks()); @@ -2357,7 +2358,46 @@ public class TestVertexImpl { assertTrue(tasks.keySet().iterator().next().equals(firstTask)); } + + @Test(timeout = 5000) + public void testVertexSetParallelismIncreaseException() throws Exception { + initAllVertices(VertexState.INITED); + VertexImpl v3 = vertices.get("vertex3"); + Assert.assertEquals(2, v3.getTotalTasks()); + Map tasks = v3.getTasks(); + Assert.assertEquals(2, tasks.size()); + + VertexImpl v1 = vertices.get("vertex1"); + startVertex(vertices.get("vertex2")); + startVertex(v1); + + // increase not supported + try { + v3.setParallelism(100, null, null, null); + Assert.fail(); + } catch (TezUncheckedException e) { } + } + @Test(timeout = 5000) + public void testVertexSetParallelismMultipleException() throws Exception { + initAllVertices(VertexState.INITED); + VertexImpl v3 = vertices.get("vertex3"); + Assert.assertEquals(2, v3.getTotalTasks()); + Map tasks = v3.getTasks(); + Assert.assertEquals(2, tasks.size()); + + VertexImpl v1 = vertices.get("vertex1"); + startVertex(vertices.get("vertex2")); + startVertex(v1); + v3.setParallelism(1, null, null, null); + + // multiple invocations not supported + try { + v3.setParallelism(1, null, null, null); + Assert.fail(); + } catch (TezUncheckedException e) { } + } + @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testVertexPendingTaskEvents() { @@ -2415,8 +2455,7 @@ public class TestVertexImpl { Map edgeManagerDescriptors = Collections.singletonMap(v3.getName(), edgeManagerDescriptor); - assertTrue(v5.setParallelism(v5.getTotalTasks() - 1, null, - edgeManagerDescriptors, null)); // Must decrease. + v5.setParallelism(v5.getTotalTasks() - 1, null, edgeManagerDescriptors, null); // Must decrease. VertexImpl v5Impl = (VertexImpl) v5; @@ -2938,11 +2977,6 @@ public class TestVertexImpl { } - @Test(timeout = 5000) - public void testCommitterInitAndSetup() { - // FIXME need to add a test for this - } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testBadCommitter() throws Exception { @@ -3252,10 +3286,6 @@ public class TestVertexImpl { Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState()); } - @Test(timeout = 5000) - public void testHistoryEventGeneration() { - } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testInvalidEvent() { @@ -3391,7 +3421,7 @@ public class TestVertexImpl { initializer.stateUpdates.get(1).getVertexState()); } - @Test(timeout = 1000000) + @Test(timeout = 10000) public void testInputInitializerEventMultipleAttempts() throws Exception { useCustomInitializer = true; customInitializer = new EventHandlingRootInputInitializer(null);