tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-387. Move DAGClientHandler into its own class (Jeff Zhang via bikas)
Date Fri, 27 Jun 2014 22:29:22 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master 5ae6a40b6 -> df7ff3cb4


TEZ-387. Move DAGClientHandler into its own class (Jeff Zhang via bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/df7ff3cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/df7ff3cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/df7ff3cb

Branch: refs/heads/master
Commit: df7ff3cb487fe835da17caa78024ea6f3aa76955
Parents: 5ae6a40
Author: Bikas Saha <bikas@apache.org>
Authored: Fri Jun 27 15:29:12 2014 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Fri Jun 27 15:29:12 2014 -0700

----------------------------------------------------------------------
 .../tez/dag/api/client/DAGClientHandler.java    | 126 ++++++++++++++++++
 .../tez/dag/api/client/DAGClientServer.java     |   1 -
 ...DAGClientAMProtocolBlockingPBServerImpl.java |   2 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 109 ++--------------
 .../dag/api/client/TestDAGClientHandler.java    | 129 +++++++++++++++++++
 5 files changed, 266 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/df7ff3cb/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
new file mode 100644
index 0000000..e1392d1
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
@@ -0,0 +1,126 @@
+package org.apache.tez.dag.api.client;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.tez.client.PreWarmContext;
+import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.app.DAGAppMaster;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.records.TezDAGID;
+
+public class DAGClientHandler {
+
+  private Log LOG = LogFactory.getLog(DAGClientHandler.class);
+
+  private DAGAppMaster dagAppMaster;
+  
+  public DAGClientHandler(DAGAppMaster dagAppMaster) {
+    this.dagAppMaster = dagAppMaster;
+  }
+
+  private DAG getCurrentDAG() {
+    return dagAppMaster.getContext().getCurrentDAG();
+  }
+
+  public List<String> getAllDAGs() throws TezException {
+    return Collections.singletonList(getCurrentDAG().getID().toString());
+  }
+
+  public DAGStatus getDAGStatus(String dagIdStr,
+      Set<StatusGetOpts> statusOptions) throws TezException {
+    return getDAG(dagIdStr).getDAGStatus(statusOptions);
+  }
+
+  public VertexStatus getVertexStatus(String dagIdStr, String vertexName,
+      Set<StatusGetOpts> statusOptions) throws TezException {
+    VertexStatus status =
+        getDAG(dagIdStr).getVertexStatus(vertexName, statusOptions);
+    if (status == null) {
+      throw new TezException("Unknown vertexName: " + vertexName);
+    }
+
+    return status;
+  }
+
+  DAG getDAG(String dagIdStr) throws TezException {
+    TezDAGID dagId = TezDAGID.fromString(dagIdStr);
+    if (dagId == null) {
+      throw new TezException("Bad dagId: " + dagIdStr);
+    }
+
+    DAG currentDAG = getCurrentDAG();
+    if (currentDAG == null) {
+      throw new TezException("No running dag at present");
+    }
+    if (!currentDAG.getID().toString().equals(dagId.toString())) {
+      LOG.warn("Current DAGID : "
+          + (currentDAG.getID() == null ? "NULL" : currentDAG.getID())
+          + ", Looking for string (not found): " + dagIdStr + ", dagIdObj: "
+          + dagId);
+      throw new TezException("Unknown dagId: " + dagIdStr);
+    }
+
+    return currentDAG;
+  }
+
+  public void tryKillDAG(String dagIdStr) throws TezException {
+    DAG dag = getDAG(dagIdStr);
+    LOG.info("Sending client kill to dag: " + dagIdStr);
+    dagAppMaster.tryKillDAG(dag);
+  }
+
+  public synchronized String submitDAG(DAGPlan dagPlan,
+      Map<String, LocalResource> additionalAmResources) throws TezException {
+    return dagAppMaster.submitDAGToAppMaster(dagPlan, additionalAmResources);
+  }
+
+  public synchronized void shutdownAM() {
+    LOG.info("Received message to shutdown AM");
+    if (dagAppMaster != null) {
+      dagAppMaster.shutdownTezAM();
+    }
+  }
+
+  public synchronized TezSessionStatus getSessionStatus() throws TezException {
+    if (!dagAppMaster.isSession()) {
+      throw new TezException("Unsupported operation as AM not running in"
+          + " session mode");
+    }
+    switch (dagAppMaster.getState()) {
+    case NEW:
+    case INITED:
+      return TezSessionStatus.INITIALIZING;
+    case IDLE:
+      return TezSessionStatus.READY;
+    case RECOVERING:
+    case RUNNING:
+      return TezSessionStatus.RUNNING;
+    case ERROR:
+    case FAILED:
+    case SUCCEEDED:
+    case KILLED:
+      return TezSessionStatus.SHUTDOWN;
+    }
+    return TezSessionStatus.INITIALIZING;
+  }
+
+  public synchronized void preWarmContainers(PreWarmContext preWarmContext)
+      throws TezException {
+    if (dagAppMaster == null) {
+      throw new TezException("DAG App Master is not initialized");
+    }
+    dagAppMaster.startPreWarmContainers(preWarmContext);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/df7ff3cb/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
index 3c2e669..34980b5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
@@ -39,7 +39,6 @@ import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPBServerImpl;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.DAGClientAMProtocol;
-import org.apache.tez.dag.app.DAGAppMaster.DAGClientHandler;
 import org.apache.tez.dag.app.security.authorize.TezAMPolicyProvider;
 
 import com.google.protobuf.BlockingService;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/df7ff3cb/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
index f205f9f..6f088d4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.client.TezSessionStatus;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClientHandler;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.DAGStatusBuilder;
 import org.apache.tez.dag.api.client.VertexStatus;
@@ -46,7 +47,6 @@ import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRespons
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TryKillDAGRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TryKillDAGResponseProto;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
-import org.apache.tez.dag.app.DAGAppMaster.DAGClientHandler;
 
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/df7ff3cb/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 7189300..c3661f9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.app;
 
 import com.google.common.base.Preconditions;
+
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.io.File;
@@ -87,7 +88,6 @@ import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.client.PreWarmContext;
-import org.apache.tez.client.TezSessionStatus;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezConverterUtils;
 import org.apache.tez.common.TezUtils;
@@ -101,10 +101,8 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.client.DAGClientHandler;
 import org.apache.tez.dag.api.client.DAGClientServer;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.api.client.StatusGetOpts;
-import org.apache.tez.dag.api.client.VertexStatus;
 import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
@@ -304,7 +302,7 @@ public class DAGAppMaster extends AbstractService {
     dispatcher = createDispatcher();
     context = new RunningAppContext(conf);
 
-    clientHandler = new DAGClientHandler();
+    clientHandler = new DAGClientHandler(this);
 
     addIfService(dispatcher, false);
 
@@ -914,7 +912,7 @@ public class DAGAppMaster extends AbstractService {
         + oldState + " new state: " + state);
   }
 
-  synchronized void shutdownTezAM() {
+  public synchronized void shutdownTezAM() {
     sessionStopped.set(true);
     this.taskSchedulerEventHandler.setShouldUnregisterFlag();
     if (currentDAG != null
@@ -932,7 +930,7 @@ public class DAGAppMaster extends AbstractService {
     }
   }
 
-  synchronized String submitDAGToAppMaster(DAGPlan dagPlan,
+  public synchronized String submitDAGToAppMaster(DAGPlan dagPlan,
       Map<String, LocalResource> additionalResources) throws TezException {
     if(currentDAG != null
         && !state.equals(DAGAppMasterState.IDLE)) {
@@ -971,7 +969,7 @@ public class DAGAppMaster extends AbstractService {
     return currentDAG.getID().toString();
   }
 
-  synchronized void startPreWarmContainers(PreWarmContext preWarmContext)
+  public synchronized void startPreWarmContainers(PreWarmContext preWarmContext)
       throws TezException {
     // Check if there is a running DAG
     if(currentDAG != null
@@ -1016,98 +1014,11 @@ public class DAGAppMaster extends AbstractService {
     startDAG(dag.createDag(amConf), null);
   }
 
-  public class DAGClientHandler {
-
-    public List<String> getAllDAGs() throws TezException {
-      return Collections.singletonList(currentDAG.getID().toString());
-    }
-
-    public DAGStatus getDAGStatus(String dagIdStr,
-                                  Set<StatusGetOpts> statusOptions)
-        throws TezException {
-      return getDAG(dagIdStr).getDAGStatus(statusOptions);
-    }
-
-    public VertexStatus getVertexStatus(String dagIdStr, String vertexName,
-        Set<StatusGetOpts> statusOptions)
-        throws TezException{
-      VertexStatus status = getDAG(dagIdStr)
-          .getVertexStatus(vertexName, statusOptions);
-      if(status == null) {
-        throw new TezException("Unknown vertexName: " + vertexName);
-      }
-
-      return status;
-    }
-
-    DAG getDAG(String dagIdStr) throws TezException {
-      TezDAGID dagId = TezDAGID.fromString(dagIdStr);
-      if(dagId == null) {
-        throw new TezException("Bad dagId: " + dagIdStr);
-      }
-
-      if(currentDAG == null) {
-        throw new TezException("No running dag at present");
-      }
-      if(!dagId.equals(currentDAG.getID())) {
-        LOG.warn("Current DAGID : "
-            + (currentDAG.getID() == null ? "NULL" : currentDAG.getID())
-            + ", Looking for string (not found): " + dagIdStr + ", dagIdObj: "
-            + dagId);
-        throw new TezException("Unknown dagId: " + dagIdStr);
-      }
-
-      return currentDAG;
-    }
-
-    public void tryKillDAG(String dagIdStr)
-        throws TezException {
-      DAG dag = getDAG(dagIdStr);
-      LOG.info("Sending client kill to dag: " + dagIdStr);
-      //send a DAG_KILL message
-      sendEvent(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL));
-    }
-
-    public synchronized String submitDAG(DAGPlan dagPlan,
-        Map<String, LocalResource> additionalAmResources) throws TezException {
-      return submitDAGToAppMaster(dagPlan, additionalAmResources);
-    }
-
-    public synchronized void shutdownAM() {
-      LOG.info("Received message to shutdown AM");
-      shutdownTezAM();
-    }
-
-    public synchronized TezSessionStatus getSessionStatus() throws TezException {
-      if (!isSession) {
-        throw new TezException("Unsupported operation as AM not running in"
-            + " session mode");
-      }
-      switch (state) {
-      case NEW:
-      case INITED:
-        return TezSessionStatus.INITIALIZING;
-      case IDLE:
-        return TezSessionStatus.READY;
-      case RECOVERING:
-      case RUNNING:
-        return TezSessionStatus.RUNNING;
-      case ERROR:
-      case FAILED:
-      case SUCCEEDED:
-      case KILLED:
-        return TezSessionStatus.SHUTDOWN;
-      }
-      return TezSessionStatus.INITIALIZING;
-    }
-
-    public synchronized void preWarmContainers(PreWarmContext preWarmContext)
-        throws TezException {
-      startPreWarmContainers(preWarmContext);
-    }
-
+  @SuppressWarnings("unchecked")
+  public void tryKillDAG(DAG dag){
+    dispatcher.getEventHandler().handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL));
   }
-
+  
   private Map<String, LocalResource> getAdditionalLocalResourceDiff(
       DAG dag, Map<String, LocalResource> additionalResources) throws TezException
{
     if (additionalResources == null) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/df7ff3cb/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
new file mode 100644
index 0000000..2756906
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
@@ -0,0 +1,129 @@
+package org.apache.tez.dag.api.client;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.client.PreWarmContext;
+import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.DAGAppMaster;
+import org.apache.tez.dag.app.DAGAppMasterState;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.records.TezDAGID;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.internal.util.collections.Sets;
+
+
+public class TestDAGClientHandler {
+  
+  @Test(timeout = 1000)
+  @SuppressWarnings("unchecked")
+  public void testDAGClientHandler() throws TezException {
+
+    TezDAGID mockTezDAGId = mock(TezDAGID.class);
+    when(mockTezDAGId.getId()).thenReturn(1);
+    when(mockTezDAGId.toString()).thenReturn("dag_9999_0001_1");
+
+    DAG mockDAG = mock(DAG.class);
+    when(mockDAG.getID()).thenReturn(mockTezDAGId);
+    DAGStatusBuilder mockDagStatusBuilder = mock(DAGStatusBuilder.class);
+    when(mockDAG.getDAGStatus(anySetOf(StatusGetOpts.class))).thenReturn(
+        mockDagStatusBuilder);
+    VertexStatusBuilder mockVertexStatusBuilder =
+        mock(VertexStatusBuilder.class);
+    when(mockDAG.getVertexStatus(anyString(), anySetOf(StatusGetOpts.class)))
+        .thenReturn(mockVertexStatusBuilder);
+
+    DAGAppMaster mockDagAM = mock(DAGAppMaster.class);
+    AppContext mockAppContext = mock(AppContext.class);
+    when(mockDagAM.getContext()).thenReturn(mockAppContext);
+    when(mockDagAM.getContext().getCurrentDAG()).thenReturn(mockDAG);
+
+    DAGClientHandler dagClientHandler = new DAGClientHandler(mockDagAM);
+
+    // getAllDAGs()
+    assertEquals(1, dagClientHandler.getAllDAGs().size());
+    assertEquals("dag_9999_0001_1", dagClientHandler.getAllDAGs().get(0));
+
+    // getDAGStatus
+    try {
+      dagClientHandler.getDAGStatus("dag_9999_0001_2", Sets.newSet(StatusGetOpts.GET_COUNTERS));
+      fail("should not come here");
+    } catch (TezException e) {
+      assertTrue(e.getMessage().contains("Unknown dagId"));
+    }
+    DAGStatus dagStatus = dagClientHandler.getDAGStatus("dag_9999_0001_1", 
+        Sets.newSet(StatusGetOpts.GET_COUNTERS));
+    assertEquals(mockDagStatusBuilder, dagStatus);
+
+    // getVertexStatus
+    try {
+      dagClientHandler.getVertexStatus("dag_9999_0001_2", "v1", Sets.newSet(StatusGetOpts.GET_COUNTERS));
+      fail("should not come here");
+    } catch (TezException e) {
+      assertTrue(e.getMessage().contains("Unknown dagId"));
+    }
+    VertexStatus vertexStatus = dagClientHandler.getVertexStatus("dag_9999_0001_1", "v1",
+        Sets.newSet(StatusGetOpts.GET_COUNTERS));
+    assertEquals(mockVertexStatusBuilder, vertexStatus);
+    
+    
+    // getSessionStatus
+    when(mockDagAM.isSession()).thenReturn(false);
+    try{
+      dagClientHandler.getSessionStatus();
+      fail("should not come here");
+    }catch(TezException e){
+      assertEquals("Unsupported operation as AM not running in session mode", e.getMessage());
+    }
+    when(mockDagAM.isSession()).thenReturn(true);
+    when(mockDagAM.getState()).thenReturn(DAGAppMasterState.INITED);
+    assertEquals(TezSessionStatus.INITIALIZING, dagClientHandler.getSessionStatus());
+    when(mockDagAM.getState()).thenReturn(DAGAppMasterState.ERROR);
+    assertEquals(TezSessionStatus.SHUTDOWN, dagClientHandler.getSessionStatus());
+    
+    
+    // startPreWarmContainers
+    PreWarmContext mockPreWarnContext = mock(PreWarmContext.class);
+    dagClientHandler.preWarmContainers(mockPreWarnContext);
+    verify(mockDagAM).startPreWarmContainers(mockPreWarnContext);
+    
+    // tryKillDAG
+    try{
+      dagClientHandler.tryKillDAG("dag_9999_0001_2");
+      fail("should not come here");
+    }catch(TezException e){
+      assertTrue(e.getMessage().contains("Unknown dagId"));
+    }
+    dagClientHandler.tryKillDAG("dag_9999_0001_1");
+    ArgumentCaptor<DAG> eventCaptor = ArgumentCaptor.forClass(DAG.class);
+    verify(mockDagAM, times(1)).tryKillDAG(eventCaptor.capture());
+    assertEquals(1, eventCaptor.getAllValues().size());
+    assertTrue(eventCaptor.getAllValues().get(0) instanceof DAG);
+    assertEquals("dag_9999_0001_1",  ((DAG)eventCaptor.getAllValues().get(0)).getID().toString());
+
+    // submitDAG
+    DAGPlan dagPlan = DAGPlan.getDefaultInstance();
+    Map<String,LocalResource> localResources = new HashMap<String, LocalResource>();
+    dagClientHandler.submitDAG(dagPlan, localResources);
+    verify(mockDagAM).submitDAGToAppMaster(dagPlan, localResources);
+    
+    // shutdown
+    dagClientHandler.shutdownAM();
+    verify(mockDagAM).shutdownTezAM();
+  }
+  
+}
\ No newline at end of file


Mime
View raw message