tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject git commit: TEZ-1072. Consolidate monitoring APIs in DAGClient (jeagles)
Date Thu, 14 Aug 2014 14:05:03 GMT
Repository: tez
Updated Branches:
  refs/heads/master 773c0ea0b -> 30b773b5e


TEZ-1072. Consolidate monitoring APIs in DAGClient (jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/30b773b5
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/30b773b5
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/30b773b5

Branch: refs/heads/master
Commit: 30b773b5e65e64367ef035223d7a649138224e0b
Parents: 773c0ea
Author: Jonathan Eagles <jeagles@gmail.com>
Authored: Thu Aug 14 09:02:36 2014 -0500
Committer: Jonathan Eagles <jeagles@gmail.com>
Committed: Thu Aug 14 09:02:36 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/tez/dag/api/client/DAGClient.java    | 18 +-----------
 .../dag/api/client/rpc/DAGClientRPCImpl.java    | 16 ++---------
 .../tez/dag/api/client/rpc/TestDAGClient.java   | 30 ++------------------
 .../apache/tez/examples/OrderedWordCount.java   |  2 +-
 .../java/org/apache/tez/examples/WordCount.java |  2 +-
 .../examples/BroadcastAndOneToOneExample.java   |  2 +-
 .../mapreduce/examples/IntersectDataGen.java    |  2 +-
 .../mapreduce/examples/IntersectExample.java    |  2 +-
 .../mapreduce/examples/IntersectValidate.java   |  2 +-
 .../tez/mapreduce/examples/UnionExample.java    |  2 +-
 .../apache/tez/dag/api/client/MRDAGClient.java  |  9 +-----
 12 files changed, 15 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/30b773b5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ec78b85..468394e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -55,6 +55,7 @@ INCOMPATIBLE CHANGES
   TEZ-1407. Move MRInput related methods out of MRHelpers and consolidate.
   TEZ-1194. Make TezUserPayload user facing for payload specification
   TEZ-1347. Consolidate MRHelpers.
+  TEZ-1072. Consolidate monitoring APIs in DAGClient
 
 Release 0.4.0-incubating: 2014-04-05
 

http://git-wip-us.apache.org/repos/asf/tez/blob/30b773b5/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
index a3e42db..578ad25 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
@@ -83,22 +83,6 @@ public abstract class DAGClient implements Closeable {
   public abstract DAGStatus waitForCompletion() throws IOException, TezException;
 
   /**
-   * Wait for DAG to complete and print the selected vertex status periodically.
-   * 
-   * @param vertices
-   *          which vertex details to print; null mean no vertex status and it
-   *          is equivalent to call <code>waitForCompletion()</code>
-   * @param statusGetOpts
-   *          : status get options. For example, to get counter pass
-   *          <code>EnumSet.of(StatusGetOpts.GET_COUNTERS)</code>
-   * @return Final DAG Status
-   * @throws IOException
-   * @throws TezException
-   */
-  public abstract DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set<Vertex>
vertices,
-      @Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException;
-
-  /**
    * Wait for DAG to complete and periodically print *all* vertices' status.
    * 
    * @param statusGetOpts
@@ -108,6 +92,6 @@ public abstract class DAGClient implements Closeable {
    * @throws IOException
    * @throws TezException
    */
-  public abstract DAGStatus waitForCompletionWithAllStatusUpdates(@Nullable Set<StatusGetOpts>
statusGetOpts)
+  public abstract DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set<StatusGetOpts>
statusGetOpts)
       throws IOException, TezException;
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/30b773b5/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index 3465384..8adeb5a 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -319,28 +319,16 @@ public class DAGClientRPCImpl extends DAGClient {
 
   @Override
   public DAGStatus waitForCompletion() throws IOException, TezException {
-    return waitForCompletionWithStatusUpdates(null, EnumSet.noneOf(StatusGetOpts.class));
+    return _waitForCompletionWithStatusUpdates(null, EnumSet.noneOf(StatusGetOpts.class));
   }
 
   @Override
-  public DAGStatus waitForCompletionWithAllStatusUpdates(@Nullable Set<StatusGetOpts>
statusGetOpts)
+  public DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set<StatusGetOpts>
statusGetOpts)
       throws IOException, TezException {
     Set<String> vertexSet = getDAGStatus(statusGetOpts).getVertexProgress().keySet();
     return _waitForCompletionWithStatusUpdates(vertexSet, statusGetOpts);
   }
 
-  @Override
-  public DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set<Vertex> vertices,
-      @Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException
{
-    Set<String> vertexNames = new HashSet<String>();
-    if (vertices != null) {
-      for (Vertex vertex : vertices) {
-        vertexNames.add(vertex.getName());
-      }
-    }
-    return _waitForCompletionWithStatusUpdates(vertexNames, statusGetOpts);
-  }
-
   private DAGStatus _waitForCompletionWithStatusUpdates(@Nullable Set<String> vertexNames,
       @Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException
{
     DAGStatus dagStatus;

http://git-wip-us.apache.org/repos/asf/tez/blob/30b773b5/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
index 62aca7d..a6068a3 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
@@ -252,33 +252,9 @@ public class TestDAGClient {
     verify(mockProxy, times(2)).getDAGStatus(null, GetDAGStatusRequestProto.newBuilder()
       .setDagId(dagIdStr).build());
   }
-  
+
   @Test
   public void testWaitForCompletionWithStatusUpdates() throws Exception{
-    // first time return DAG_RUNNING, second time return DAG_SUCCEEDED
-    when(mockProxy.getDAGStatus(isNull(RpcController.class), any(GetDAGStatusRequestProto.class)))
-      .thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus(dagStatusProtoWithoutCounters).build())
-      .thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus
-                  (DAGStatusProto.newBuilder(dagStatusProtoWithoutCounters).setState(DAGStatusStateProto.DAG_SUCCEEDED).build())
-               .build());
-    dagClient.waitForCompletionWithStatusUpdates(null, null);
-    verify(mockProxy, times(2)).getDAGStatus(null, GetDAGStatusRequestProto.newBuilder()
-        .setDagId(dagIdStr).build());
-    
-    // first time return DAG_RUNNING, second time return DAG_SUCCEEDED
-    when(mockProxy.getDAGStatus(isNull(RpcController.class), any(GetDAGStatusRequestProto.class)))
-    .thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus(dagStatusProtoWithCounters).build())
-    .thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus
-                (DAGStatusProto.newBuilder(dagStatusProtoWithCounters).setState(DAGStatusStateProto.DAG_SUCCEEDED).build())
-             .build());
-    dagClient.waitForCompletionWithStatusUpdates(Sets.newSet(new Vertex("v1",null, 1, Resource.newInstance(1024,
1))), 
-        Sets.newSet(StatusGetOpts.GET_COUNTERS));
-    verify(mockProxy, times(2)).getDAGStatus(null, GetDAGStatusRequestProto.newBuilder()
-        .setDagId(dagIdStr).addStatusOptions(StatusGetOptsProto.GET_COUNTERS).build());
-  }
-  
-  @Test
-  public void testWaitForCompletionWithAllStatusUpdates() throws Exception{
 
     // first time and second time return DAG_RUNNING, third time return DAG_SUCCEEDED
     when(mockProxy.getDAGStatus(isNull(RpcController.class), any(GetDAGStatusRequestProto.class)))
@@ -290,7 +266,7 @@ public class TestDAGClient {
     
     //  first time for getVertexSet
     //  second & third time for check completion
-    dagClient.waitForCompletionWithAllStatusUpdates(null);
+    dagClient.waitForCompletionWithStatusUpdates(null);
     verify(mockProxy, times(3)).getDAGStatus(null, GetDAGStatusRequestProto.newBuilder()
         .setDagId(dagIdStr).build());
 
@@ -301,7 +277,7 @@ public class TestDAGClient {
       .thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus
                 (DAGStatusProto.newBuilder(dagStatusProtoWithCounters).setState(DAGStatusStateProto.DAG_SUCCEEDED).build())
              .build());
-    dagClient.waitForCompletionWithAllStatusUpdates(Sets.newSet(StatusGetOpts.GET_COUNTERS));
+    dagClient.waitForCompletionWithStatusUpdates(Sets.newSet(StatusGetOpts.GET_COUNTERS));
     verify(mockProxy, times(3)).getDAGStatus(null, GetDAGStatusRequestProto.newBuilder()
       .setDagId(dagIdStr).addStatusOptions(StatusGetOptsProto.GET_COUNTERS).build());
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/30b773b5/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
index c33ac3e..8ba8179 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
@@ -185,7 +185,7 @@ public class OrderedWordCount extends Configured implements Tool  {
         DAGClient dagClient = tezClient.submitDAG(dag);
 
         // monitoring
-        DAGStatus dagStatus = dagClient.waitForCompletionWithAllStatusUpdates(null);
+        DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
         if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
           System.out.println("OrderedWordCount failed with diagnostics: " + dagStatus.getDiagnostics());
           return false;

http://git-wip-us.apache.org/repos/asf/tez/blob/30b773b5/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/WordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/WordCount.java
index 54dd2ad..6e49dbc 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/WordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/WordCount.java
@@ -218,7 +218,7 @@ public class WordCount extends Configured implements Tool {
         DAGClient dagClient = tezClient.submitDAG(dag);
 
         // monitor the progress and wait for completion. This method blocks until the dag
is done.
-        DAGStatus dagStatus = dagClient.waitForCompletionWithAllStatusUpdates(null);
+        DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
         // check success or failure and print diagnostics
         if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
           System.out.println("WordCount failed with diagnostics: " + dagStatus.getDiagnostics());

http://git-wip-us.apache.org/repos/asf/tez/blob/30b773b5/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
index e445359..40319de 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
@@ -213,7 +213,7 @@ public class BroadcastAndOneToOneExample extends Configured implements
Tool {
         dagClient = tezSession.submitDAG(dag);
 
         // monitoring
-        DAGStatus dagStatus = dagClient.waitForCompletionWithAllStatusUpdates(null);
+        DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
         if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
           System.out.println("DAG diagnostics: " + dagStatus.getDiagnostics());
           return false;

http://git-wip-us.apache.org/repos/asf/tez/blob/30b773b5/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
index fda61b0..711240c 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
@@ -181,7 +181,7 @@ public class IntersectDataGen extends Configured implements Tool {
 
     tezSession.waitTillReady();
     DAGClient dagClient = tezSession.submitDAG(dag);
-    DAGStatus dagStatus = dagClient.waitForCompletionWithAllStatusUpdates(null);
+    DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
     if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
       LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
       return -1;

http://git-wip-us.apache.org/repos/asf/tez/blob/30b773b5/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
index 6caafce..f32643d 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
@@ -160,7 +160,7 @@ public class IntersectExample extends Configured implements Tool {
 
     tezSession.waitTillReady();
     DAGClient dagClient = tezSession.submitDAG(dag);
-    DAGStatus dagStatus = dagClient.waitForCompletionWithAllStatusUpdates(null);
+    DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
     if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
       LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
       return -1;

http://git-wip-us.apache.org/repos/asf/tez/blob/30b773b5/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
index 02a7563..b0fcf06 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
@@ -154,7 +154,7 @@ public class IntersectValidate extends Configured implements Tool {
 
     tezSession.waitTillReady();
     DAGClient dagClient = tezSession.submitDAG(dag);
-    DAGStatus dagStatus = dagClient.waitForCompletionWithAllStatusUpdates(null);
+    DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
     if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
       LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
       return -1;

http://git-wip-us.apache.org/repos/asf/tez/blob/30b773b5/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
index 487d6ee..a9b23e0 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
@@ -272,7 +272,7 @@ public class UnionExample {
         dagClient = tezSession.submitDAG(dag);
 
         // monitoring
-        DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(dag.getVertices(),
EnumSet.of(StatusGetOpts.GET_COUNTERS));
+        DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(EnumSet.of(StatusGetOpts.GET_COUNTERS));
         if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
           System.out.println("DAG diagnostics: " + dagStatus.getDiagnostics());
           return false;

http://git-wip-us.apache.org/repos/asf/tez/blob/30b773b5/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java b/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java
index 7de00b5..8dc7a9b 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java
@@ -78,15 +78,8 @@ public class MRDAGClient extends DAGClient {
 
   @Override
   public DAGStatus waitForCompletionWithStatusUpdates(
-      @Nullable Set<Vertex> vertices,
       @Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException
{
-    return realClient.waitForCompletionWithStatusUpdates(vertices, statusGetOpts);
-  }
-
-  @Override
-  public DAGStatus waitForCompletionWithAllStatusUpdates(
-      @Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException
{
-    return realClient.waitForCompletionWithAllStatusUpdates(statusGetOpts);
+    return realClient.waitForCompletionWithStatusUpdates(statusGetOpts);
   }
 
   @Override


Mime
View raw message