Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4981311CBD for ; Thu, 14 Aug 2014 14:05:07 +0000 (UTC) Received: (qmail 36698 invoked by uid 500); 14 Aug 2014 14:05:04 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 36663 invoked by uid 500); 14 Aug 2014 14:05:04 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 36654 invoked by uid 99); 14 Aug 2014 14:05:04 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Aug 2014 14:05:04 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id CE6499BEEDC; Thu, 14 Aug 2014 14:05:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jeagles@apache.org To: commits@tez.apache.org Message-Id: <7bbcfcad34e14f9ba71e0cca3a4e92eb@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: TEZ-1072. Consolidate monitoring APIs in DAGClient (jeagles) Date: Thu, 14 Aug 2014 14:05:03 +0000 (UTC) Repository: tez Updated Branches: refs/heads/master 773c0ea0b -> 30b773b5e TEZ-1072. Consolidate monitoring APIs in DAGClient (jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/30b773b5 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/30b773b5 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/30b773b5 Branch: refs/heads/master Commit: 30b773b5e65e64367ef035223d7a649138224e0b Parents: 773c0ea Author: Jonathan Eagles Authored: Thu Aug 14 09:02:36 2014 -0500 Committer: Jonathan Eagles Committed: Thu Aug 14 09:02:36 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/dag/api/client/DAGClient.java | 18 +----------- .../dag/api/client/rpc/DAGClientRPCImpl.java | 16 ++--------- .../tez/dag/api/client/rpc/TestDAGClient.java | 30 ++------------------ .../apache/tez/examples/OrderedWordCount.java | 2 +- .../java/org/apache/tez/examples/WordCount.java | 2 +- .../examples/BroadcastAndOneToOneExample.java | 2 +- .../mapreduce/examples/IntersectDataGen.java | 2 +- .../mapreduce/examples/IntersectExample.java | 2 +- .../mapreduce/examples/IntersectValidate.java | 2 +- .../tez/mapreduce/examples/UnionExample.java | 2 +- .../apache/tez/dag/api/client/MRDAGClient.java | 9 +----- 12 files changed, 15 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/30b773b5/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ec78b85..468394e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -55,6 +55,7 @@ INCOMPATIBLE CHANGES TEZ-1407. Move MRInput related methods out of MRHelpers and consolidate. TEZ-1194. Make TezUserPayload user facing for payload specification TEZ-1347. Consolidate MRHelpers. + TEZ-1072. Consolidate monitoring APIs in DAGClient Release 0.4.0-incubating: 2014-04-05 http://git-wip-us.apache.org/repos/asf/tez/blob/30b773b5/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java index a3e42db..578ad25 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java @@ -83,22 +83,6 @@ public abstract class DAGClient implements Closeable { public abstract DAGStatus waitForCompletion() throws IOException, TezException; /** - * Wait for DAG to complete and print the selected vertex status periodically. - * - * @param vertices - * which vertex details to print; null mean no vertex status and it - * is equivalent to call waitForCompletion() - * @param statusGetOpts - * : status get options. For example, to get counter pass - * EnumSet.of(StatusGetOpts.GET_COUNTERS) - * @return Final DAG Status - * @throws IOException - * @throws TezException - */ - public abstract DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set vertices, - @Nullable Set statusGetOpts) throws IOException, TezException; - - /** * Wait for DAG to complete and periodically print *all* vertices' status. * * @param statusGetOpts @@ -108,6 +92,6 @@ public abstract class DAGClient implements Closeable { * @throws IOException * @throws TezException */ - public abstract DAGStatus waitForCompletionWithAllStatusUpdates(@Nullable Set statusGetOpts) + public abstract DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set statusGetOpts) throws IOException, TezException; } http://git-wip-us.apache.org/repos/asf/tez/blob/30b773b5/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java index 3465384..8adeb5a 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java @@ -319,28 +319,16 @@ public class DAGClientRPCImpl extends DAGClient { @Override public DAGStatus waitForCompletion() throws IOException, TezException { - return waitForCompletionWithStatusUpdates(null, EnumSet.noneOf(StatusGetOpts.class)); + return _waitForCompletionWithStatusUpdates(null, EnumSet.noneOf(StatusGetOpts.class)); } @Override - public DAGStatus waitForCompletionWithAllStatusUpdates(@Nullable Set statusGetOpts) + public DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set statusGetOpts) throws IOException, TezException { Set vertexSet = getDAGStatus(statusGetOpts).getVertexProgress().keySet(); return _waitForCompletionWithStatusUpdates(vertexSet, statusGetOpts); } - @Override - public DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set vertices, - @Nullable Set statusGetOpts) throws IOException, TezException { - Set vertexNames = new HashSet(); - if (vertices != null) { - for (Vertex vertex : vertices) { - vertexNames.add(vertex.getName()); - } - } - return _waitForCompletionWithStatusUpdates(vertexNames, statusGetOpts); - } - private DAGStatus _waitForCompletionWithStatusUpdates(@Nullable Set vertexNames, @Nullable Set statusGetOpts) throws IOException, TezException { DAGStatus dagStatus; http://git-wip-us.apache.org/repos/asf/tez/blob/30b773b5/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java index 62aca7d..a6068a3 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java @@ -252,33 +252,9 @@ public class TestDAGClient { verify(mockProxy, times(2)).getDAGStatus(null, GetDAGStatusRequestProto.newBuilder() .setDagId(dagIdStr).build()); } - + @Test public void testWaitForCompletionWithStatusUpdates() throws Exception{ - // first time return DAG_RUNNING, second time return DAG_SUCCEEDED - when(mockProxy.getDAGStatus(isNull(RpcController.class), any(GetDAGStatusRequestProto.class))) - .thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus(dagStatusProtoWithoutCounters).build()) - .thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus - (DAGStatusProto.newBuilder(dagStatusProtoWithoutCounters).setState(DAGStatusStateProto.DAG_SUCCEEDED).build()) - .build()); - dagClient.waitForCompletionWithStatusUpdates(null, null); - verify(mockProxy, times(2)).getDAGStatus(null, GetDAGStatusRequestProto.newBuilder() - .setDagId(dagIdStr).build()); - - // first time return DAG_RUNNING, second time return DAG_SUCCEEDED - when(mockProxy.getDAGStatus(isNull(RpcController.class), any(GetDAGStatusRequestProto.class))) - .thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus(dagStatusProtoWithCounters).build()) - .thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus - (DAGStatusProto.newBuilder(dagStatusProtoWithCounters).setState(DAGStatusStateProto.DAG_SUCCEEDED).build()) - .build()); - dagClient.waitForCompletionWithStatusUpdates(Sets.newSet(new Vertex("v1",null, 1, Resource.newInstance(1024, 1))), - Sets.newSet(StatusGetOpts.GET_COUNTERS)); - verify(mockProxy, times(2)).getDAGStatus(null, GetDAGStatusRequestProto.newBuilder() - .setDagId(dagIdStr).addStatusOptions(StatusGetOptsProto.GET_COUNTERS).build()); - } - - @Test - public void testWaitForCompletionWithAllStatusUpdates() throws Exception{ // first time and second time return DAG_RUNNING, third time return DAG_SUCCEEDED when(mockProxy.getDAGStatus(isNull(RpcController.class), any(GetDAGStatusRequestProto.class))) @@ -290,7 +266,7 @@ public class TestDAGClient { // first time for getVertexSet // second & third time for check completion - dagClient.waitForCompletionWithAllStatusUpdates(null); + dagClient.waitForCompletionWithStatusUpdates(null); verify(mockProxy, times(3)).getDAGStatus(null, GetDAGStatusRequestProto.newBuilder() .setDagId(dagIdStr).build()); @@ -301,7 +277,7 @@ public class TestDAGClient { .thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus (DAGStatusProto.newBuilder(dagStatusProtoWithCounters).setState(DAGStatusStateProto.DAG_SUCCEEDED).build()) .build()); - dagClient.waitForCompletionWithAllStatusUpdates(Sets.newSet(StatusGetOpts.GET_COUNTERS)); + dagClient.waitForCompletionWithStatusUpdates(Sets.newSet(StatusGetOpts.GET_COUNTERS)); verify(mockProxy, times(3)).getDAGStatus(null, GetDAGStatusRequestProto.newBuilder() .setDagId(dagIdStr).addStatusOptions(StatusGetOptsProto.GET_COUNTERS).build()); } http://git-wip-us.apache.org/repos/asf/tez/blob/30b773b5/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java index c33ac3e..8ba8179 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java @@ -185,7 +185,7 @@ public class OrderedWordCount extends Configured implements Tool { DAGClient dagClient = tezClient.submitDAG(dag); // monitoring - DAGStatus dagStatus = dagClient.waitForCompletionWithAllStatusUpdates(null); + DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null); if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) { System.out.println("OrderedWordCount failed with diagnostics: " + dagStatus.getDiagnostics()); return false; http://git-wip-us.apache.org/repos/asf/tez/blob/30b773b5/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/WordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/WordCount.java index 54dd2ad..6e49dbc 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/WordCount.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/WordCount.java @@ -218,7 +218,7 @@ public class WordCount extends Configured implements Tool { DAGClient dagClient = tezClient.submitDAG(dag); // monitor the progress and wait for completion. This method blocks until the dag is done. - DAGStatus dagStatus = dagClient.waitForCompletionWithAllStatusUpdates(null); + DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null); // check success or failure and print diagnostics if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) { System.out.println("WordCount failed with diagnostics: " + dagStatus.getDiagnostics()); http://git-wip-us.apache.org/repos/asf/tez/blob/30b773b5/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java index e445359..40319de 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java @@ -213,7 +213,7 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool { dagClient = tezSession.submitDAG(dag); // monitoring - DAGStatus dagStatus = dagClient.waitForCompletionWithAllStatusUpdates(null); + DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null); if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) { System.out.println("DAG diagnostics: " + dagStatus.getDiagnostics()); return false; http://git-wip-us.apache.org/repos/asf/tez/blob/30b773b5/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java index fda61b0..711240c 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java @@ -181,7 +181,7 @@ public class IntersectDataGen extends Configured implements Tool { tezSession.waitTillReady(); DAGClient dagClient = tezSession.submitDAG(dag); - DAGStatus dagStatus = dagClient.waitForCompletionWithAllStatusUpdates(null); + DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null); if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) { LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics()); return -1; http://git-wip-us.apache.org/repos/asf/tez/blob/30b773b5/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java index 6caafce..f32643d 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java @@ -160,7 +160,7 @@ public class IntersectExample extends Configured implements Tool { tezSession.waitTillReady(); DAGClient dagClient = tezSession.submitDAG(dag); - DAGStatus dagStatus = dagClient.waitForCompletionWithAllStatusUpdates(null); + DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null); if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) { LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics()); return -1; http://git-wip-us.apache.org/repos/asf/tez/blob/30b773b5/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java index 02a7563..b0fcf06 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java @@ -154,7 +154,7 @@ public class IntersectValidate extends Configured implements Tool { tezSession.waitTillReady(); DAGClient dagClient = tezSession.submitDAG(dag); - DAGStatus dagStatus = dagClient.waitForCompletionWithAllStatusUpdates(null); + DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null); if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) { LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics()); return -1; http://git-wip-us.apache.org/repos/asf/tez/blob/30b773b5/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java index 487d6ee..a9b23e0 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java @@ -272,7 +272,7 @@ public class UnionExample { dagClient = tezSession.submitDAG(dag); // monitoring - DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(dag.getVertices(), EnumSet.of(StatusGetOpts.GET_COUNTERS)); + DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(EnumSet.of(StatusGetOpts.GET_COUNTERS)); if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) { System.out.println("DAG diagnostics: " + dagStatus.getDiagnostics()); return false; http://git-wip-us.apache.org/repos/asf/tez/blob/30b773b5/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java b/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java index 7de00b5..8dc7a9b 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java @@ -78,15 +78,8 @@ public class MRDAGClient extends DAGClient { @Override public DAGStatus waitForCompletionWithStatusUpdates( - @Nullable Set vertices, @Nullable Set statusGetOpts) throws IOException, TezException { - return realClient.waitForCompletionWithStatusUpdates(vertices, statusGetOpts); - } - - @Override - public DAGStatus waitForCompletionWithAllStatusUpdates( - @Nullable Set statusGetOpts) throws IOException, TezException { - return realClient.waitForCompletionWithAllStatusUpdates(statusGetOpts); + return realClient.waitForCompletionWithStatusUpdates(statusGetOpts); } @Override