tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jl...@apache.org
Subject tez git commit: TEZ-3943. TezClient leaks DAGClient for prewarm (Sergey Shelukhin via jlowe)
Date Tue, 29 May 2018 19:27:56 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.9 364ae4faf -> c369ba659


TEZ-3943. TezClient leaks DAGClient for prewarm (Sergey Shelukhin via jlowe)

(cherry picked from commit cf0302c429b1b24b75371374e6676376decc34b0)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c369ba65
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c369ba65
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c369ba65

Branch: refs/heads/branch-0.9
Commit: c369ba659c56a8fe7c32d4972cb99590400644b8
Parents: 364ae4f
Author: Jason Lowe <jlowe@apache.org>
Authored: Tue May 29 14:23:03 2018 -0500
Committer: Jason Lowe <jlowe@apache.org>
Committed: Tue May 29 14:26:24 2018 -0500

----------------------------------------------------------------------
 .../java/org/apache/tez/client/TezClient.java   | 24 ++++++++++++++++++--
 .../org/apache/tez/client/TestTezClient.java    | 21 ++++++++++++++++-
 2 files changed, 42 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/c369ba65/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index d2c1af4..9dd4a69 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -142,7 +142,7 @@ public class TezClient {
   @VisibleForTesting
   final ServicePluginsDescriptor servicePluginsDescriptor;
   private JavaOptsChecker javaOptsChecker = null;
-
+  private DAGClient prewarmDagClient = null;
   private int preWarmDAGCounter = 0;
 
   /* max submitDAG request size through IPC; beyond this we transfer them in the same way
we transfer local resource */
@@ -591,6 +591,25 @@ public class TezClient {
     }
   }
 
+  private void closePrewarmDagClient() {
+    if (prewarmDagClient == null) {
+      return;
+    }
+    try {
+       prewarmDagClient.tryKillDAG();
+       LOG.info("Waiting for prewarm DAG to shut down");
+       prewarmDagClient.waitForCompletion();
+    } catch (Exception ex) {
+       LOG.warn("Failed to shut down the prewarm DAG " + prewarmDagClient, ex);
+    }
+    try {
+      prewarmDagClient.close();
+    } catch (Exception e) {
+      LOG.warn("Failed to close prewarm DagClient " + prewarmDagClient, e);
+    }
+    prewarmDagClient = null;
+  }
+  
   private DAGClient submitDAGSession(DAG dag) throws TezException, IOException {
     Preconditions.checkState(isSession == true, 
         "submitDAG with additional resources applies to only session mode. " + 
@@ -693,6 +712,7 @@ public class TezClient {
    * @throws IOException
    */
   public synchronized void stop() throws TezException, IOException {
+    closePrewarmDagClient();
     try {
       if (amKeepAliveService != null) {
         amKeepAliveService.shutdownNow();
@@ -925,7 +945,7 @@ public class TezClient {
           "available", e);
     }
     if(isReady) {
-      submitDAG(dag);
+      prewarmDagClient = submitDAG(dag);
     } else {
       throw new SessionNotReady("Tez AM not ready, could not submit DAG");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/c369ba65/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index 2c04061..e959a55 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -38,6 +38,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.isNull;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.atLeast;
@@ -87,10 +88,15 @@ import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusResponseProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusRequestProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusResponseProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TezAppMasterStatusProto;
 import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
+import org.apache.tez.dag.api.records.DAGProtos.DAGStatusProto;
+import org.apache.tez.dag.api.records.DAGProtos.DAGStatusStateProto;
+import org.apache.tez.dag.api.records.DAGProtos.ProgressProto;
 import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
 import org.hamcrest.CoreMatchers;
 import org.junit.Assert;
@@ -405,7 +411,7 @@ public class TestTezClient {
     client.start();
 
     when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
-    .thenReturn(YarnApplicationState.RUNNING);
+        .thenReturn(YarnApplicationState.RUNNING);
     
     when(
         client.sessionAmProxy.getAMStatus((RpcController) any(), (GetAMStatusRequestProto)
any()))
@@ -419,9 +425,21 @@ public class TestTezClient {
     SubmitDAGRequestProto proto = captor1.getValue();
     assertTrue(proto.getDAGPlan().getName().startsWith(TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX));
 
+    setClientToReportStoppedDags(client);
     client.stop();
   }
 
+  private void setClientToReportStoppedDags(TezClientForTest client) throws Exception {
+    when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
+      .thenReturn(YarnApplicationState.FINISHED);
+    when(client.sessionAmProxy.getDAGStatus(isNull(RpcController.class), any(GetDAGStatusRequestProto.class)))
+      .thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus(DAGStatusProto.newBuilder()
+          .addDiagnostics("Diagnostics_0").setState(DAGStatusStateProto.DAG_SUCCEEDED)
+          .setDAGProgress(ProgressProto.newBuilder()
+                  .setFailedTaskCount(0).setKilledTaskCount(0).setRunningTaskCount(0)
+                  .setSucceededTaskCount(1).setTotalTaskCount(1).build()).build()).build());
+  }
+
   @Test (timeout=30000)
   public void testPreWarmWithTimeout() throws Exception {
     long startTime = 0 , endTime = 0;
@@ -506,6 +524,7 @@ public class TestTezClient {
     assertTrue("Time taken is not as expected",
         (endTime - startTime) <= timeout);
     verify(spyClient, times(2)).submitDAG(any(DAG.class));
+    setClientToReportStoppedDags(client);
     spyClient.stop();
     client.stop();
   }


Mime
View raw message