tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject tez git commit: TEZ-3155. Support a way to submit DAGs to a session where the DAG plan exceeds hadoop ipc limits. (Zhiyuan Yang via hitesh)
Date Thu, 10 Mar 2016 01:41:58 GMT
Repository: tez
Updated Branches:
  refs/heads/master e8269c270 -> dbd763fd4


TEZ-3155. Support a way to submit DAGs to a session where the DAG plan exceeds hadoop ipc
limits. (Zhiyuan Yang via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/dbd763fd
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/dbd763fd
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/dbd763fd

Branch: refs/heads/master
Commit: dbd763fd479ccebf3988d23f3284fe1ec2f16d64
Parents: e8269c2
Author: Hitesh Shah <hitesh@apache.org>
Authored: Wed Mar 9 17:41:18 2016 -0800
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Wed Mar 9 17:41:18 2016 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../java/org/apache/tez/client/TezClient.java   |  35 ++++-
 .../apache/tez/dag/api/TezConfiguration.java    |  10 ++
 .../src/main/proto/DAGClientAMProtocol.proto    |   1 +
 .../org/apache/tez/client/TestTezClient.java    |  69 ++++++++++
 .../tez/dag/api/client/DAGClientServer.java     |   7 +-
 ...DAGClientAMProtocolBlockingPBServerImpl.java |  16 ++-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  26 ++--
 .../tez/dag/api/client/TestDAGClientServer.java |   3 +-
 ...DAGClientAMProtocolBlockingPBServerImpl.java | 135 +++++++++++++++++++
 10 files changed, 284 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/dbd763fd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d01c732..3c72884 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
   TEZ-3029. Add an onError method to service plugin contexts.
 
 ALL CHANGES:
+  TEZ-3155. Support a way to submit DAGs to a session where the DAG plan exceeds hadoop ipc
limits.
   TEZ-2863. Container, node, and logs not available in UI for tasks that fail to launch
   TEZ-3140. Reduce AM memory usage during serialization
   TEZ-2756. MergeManager close should not try merging files on close if invoked after a shuffle
exception.

http://git-wip-us.apache.org/repos/asf/tez/blob/dbd763fd/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index fc98d1a..be59e2f 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -22,11 +22,17 @@ import java.io.IOException;
 import java.text.NumberFormat;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.annotation.Nullable;
 
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.tez.common.JavaOptsChecker;
 import org.apache.tez.common.RPCUtil;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.counters.Limits;
 import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
 import org.slf4j.Logger;
@@ -128,6 +134,12 @@ public class TezClient {
 
   private int preWarmDAGCounter = 0;
 
+  /* max submitDAG request size through IPC; beyond this we transfer them in the same way
we transfer local resource */
+  private int maxSubmitDAGRequestSizeThroughIPC;
+  /* this counter counts number of serialized DAGPlan and is used to give unique name to
each serialized DAGPlan */
+  private AtomicInteger serializedSubmitDAGPlanRequestCounter = new AtomicInteger(0);
+  private FileSystem stagingFs = null;
+
   private static final String atsHistoryLoggingServiceClassName =
       "org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService";
   private static final String atsHistoryACLManagerClassName =
@@ -169,6 +181,10 @@ public class TezClient {
     this.amConfig = new AMConfiguration(tezConf, localResources, credentials);
     this.apiVersionInfo = new TezApiVersionInfo();
     this.servicePluginsDescriptor = servicePluginsDescriptor;
+    this.maxSubmitDAGRequestSizeThroughIPC = tezConf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
+        CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT) -
+        tezConf.getInt(TezConfiguration.TEZ_IPC_PAYLOAD_RESERVED_BYTES,
+        TezConfiguration.TEZ_IPC_PAYLOAD_RESERVED_BYTES_DEFAULT);
     Limits.setConfiguration(tezConf);
 
     LOG.info("Tez Client Version: " + apiVersionInfo.toString());
@@ -430,6 +446,8 @@ public class TezClient {
       } catch (YarnException e) {
         throw new TezException(e);
       }
+
+      this.stagingFs = FileSystem.get(amConfig.getTezConfiguration());
     }
   }
   
@@ -507,7 +525,7 @@ public class TezClient {
         javaOptsChecker);
 
     SubmitDAGRequestProto.Builder requestBuilder = SubmitDAGRequestProto.newBuilder();
-    requestBuilder.setDAGPlan(dagPlan).build();
+    requestBuilder.setDAGPlan(dagPlan);
     if (!additionalLocalResources.isEmpty()) {
       requestBuilder.setAdditionalAmResources(DagTypeConverters
           .convertFromLocalResources(additionalLocalResources));
@@ -515,6 +533,19 @@ public class TezClient {
     
     additionalLocalResources.clear();
 
+    // if request size exceeds maxSubmitDAGRequestSizeThroughIPC, we serialize them to HDFS
+    SubmitDAGRequestProto request = requestBuilder.build();
+    if (request.getSerializedSize() > maxSubmitDAGRequestSizeThroughIPC) {
+      Path dagPlanPath = new Path(TezCommonUtils.getTezSystemStagingPath(amConfig.getTezConfiguration(),
+          sessionAppId.toString()), TezConstants.TEZ_PB_PLAN_BINARY_NAME +
+          serializedSubmitDAGPlanRequestCounter.incrementAndGet());
+
+      try (FSDataOutputStream fsDataOutputStream = stagingFs.create(dagPlanPath, false))
{
+        request.writeTo(fsDataOutputStream);
+        request = requestBuilder.clear().setSerializedRequestPath(stagingFs.resolvePath(dagPlanPath).toString()).build();
+      }
+    }
+
     DAGClientAMProtocolBlockingPB proxy = null;
     try {
       proxy = waitForProxy();
@@ -533,7 +564,7 @@ public class TezClient {
     }
 
     try {
-      SubmitDAGResponseProto response = proxy.submitDAG(null, requestBuilder.build());
+      SubmitDAGResponseProto response = proxy.submitDAG(null, request);
       // the following check is only for testing since the final class
       // SubmitDAGResponseProto cannot be mocked
       if (response != null) {

http://git-wip-us.apache.org/repos/asf/tez/blob/dbd763fd/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 221ac47..0221e6b 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1528,6 +1528,16 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_CLIENT_ASYNCHRONOUS_STOP = TEZ_PREFIX + "client.asynchronous-stop";
   public static final boolean TEZ_CLIENT_ASYNCHRONOUS_STOP_DEFAULT = true;
 
+  /**
+   * Int value. SubmitDAGPlanRequest cannot be larger than Max IPC message size minus this
number; otherwise, it will
+   * be serialized to HDFS and we transfer the path to server. Server will deserialize the
request from HDFS.
+   */
+  @Private
+  @ConfigurationScope(Scope.CLIENT)
+  @ConfigurationProperty(type="int")
+  public static final String TEZ_IPC_PAYLOAD_RESERVED_BYTES = TEZ_PREFIX + "ipc.payload.reserved.bytes";
+  public static final int TEZ_IPC_PAYLOAD_RESERVED_BYTES_DEFAULT = 5 * 1024 * 1024;
+
   // for Recovery Test
   @Private
   @ConfigurationScope(Scope.TEST)

http://git-wip-us.apache.org/repos/asf/tez/blob/dbd763fd/tez-api/src/main/proto/DAGClientAMProtocol.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGClientAMProtocol.proto b/tez-api/src/main/proto/DAGClientAMProtocol.proto
index a8171e7..113c9cc 100644
--- a/tez-api/src/main/proto/DAGClientAMProtocol.proto
+++ b/tez-api/src/main/proto/DAGClientAMProtocol.proto
@@ -63,6 +63,7 @@ message TryKillDAGResponseProto {
 message SubmitDAGRequestProto {
   optional DAGPlan d_a_g_plan = 1;
   optional PlanLocalResourcesProto additional_am_resources = 2;
+  optional string serializedRequestPath = 3;
 }
 
 message SubmitDAGResponseProto {

http://git-wip-us.apache.org/repos/asf/tez/blob/dbd763fd/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index c2531c6..a2e4956 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -19,6 +19,8 @@
 package org.apache.tez.client;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -42,6 +44,9 @@ import static org.mockito.Mockito.when;
 
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -64,6 +69,7 @@ import org.apache.tez.dag.api.SessionNotRunning;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
@@ -155,6 +161,69 @@ public class TestTezClient {
   public void testTezclientSession() throws Exception {
     testTezClient(true);
   }
+
+  @Test (timeout = 5000)
+  public void testTezClientSessionLargeDAGPlan() throws Exception {
+    // request size is within threshold of being serialized
+    _testTezClientSessionLargeDAGPlan(10*1024*1024, 10, 10, false);
+    // DAGPlan exceeds the threshold but is still less than max IPC size
+    _testTezClientSessionLargeDAGPlan(10*1024*1024, 6*1024*1024, 10, true);
+    // DAGPlan exceeds max IPC size
+    _testTezClientSessionLargeDAGPlan(10*1024*1024, 15*1024*1024, 10, true);
+    // amResources exceeds the threshold but is still less than max IPC size
+    _testTezClientSessionLargeDAGPlan(10*1024*1024, 10, 6*1024*1024, true);
+    // amResources exceeds max IPC size
+    _testTezClientSessionLargeDAGPlan(10*1024*1024, 10, 15*1024*1024, true);
+    // DAGPlan and amResources together exceed threshold but less than IPC size
+    _testTezClientSessionLargeDAGPlan(10*1024*1024, 3*1024*1024, 3*1024*1024, true);
+    // DAGPlan and amResources all exceed max IPC size
+    _testTezClientSessionLargeDAGPlan(10*1024*1024, 15*1024*1024, 15*1024*1024, true);
+  }
+
+  private void _testTezClientSessionLargeDAGPlan(int maxIPCMsgSize, int payloadSize, int
amResourceSize,
+                                               boolean shouldSerialize) throws Exception
{
+    TezConfiguration conf = new TezConfiguration();
+    conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, maxIPCMsgSize);
+    conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, "target/"+this.getClass().getName());
+    TezClientForTest client = configureAndCreateTezClient(null, true, conf);
+
+    Map<String, LocalResource> localResourceMap = new HashMap<>();
+    byte[] bytes = new byte[amResourceSize];
+    Arrays.fill(bytes, (byte)1);
+    String lrName = new String(bytes);
+    localResourceMap.put(lrName, LocalResource.newInstance(URL.newInstance("file", "localhost",
0, "/test"),
+        LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
+
+    ProcessorDescriptor processorDescriptor = ProcessorDescriptor.create("P");
+    processorDescriptor.setUserPayload(UserPayload.create(ByteBuffer.allocate(payloadSize)));
+    Vertex vertex = Vertex.create("Vertex", processorDescriptor, 1, Resource.newInstance(1,
1));
+    DAG dag = DAG.create("DAG").addVertex(vertex);
+
+    client.start();
+    client.addAppMasterLocalFiles(localResourceMap);
+    client.submitDAG(dag);
+    client.stop();
+
+    ArgumentCaptor<SubmitDAGRequestProto> captor = ArgumentCaptor.forClass(SubmitDAGRequestProto.class);
+    verify(client.sessionAmProxy).submitDAG((RpcController)any(), captor.capture());
+    SubmitDAGRequestProto request = captor.getValue();
+
+    if (shouldSerialize) {
+      /* we need manually delete the serialized dagplan since staging path here won't be
destroyed */
+      Path dagPlanPath = new Path(request.getSerializedRequestPath());
+      FileSystem fs = FileSystem.getLocal(conf);
+      fs.deleteOnExit(dagPlanPath);
+      fs.delete(dagPlanPath, false);
+
+      assertTrue(request.hasSerializedRequestPath());
+      assertFalse(request.hasDAGPlan());
+      assertFalse(request.hasAdditionalAmResources());
+    } else {
+      assertFalse(request.hasSerializedRequestPath());
+      assertTrue(request.hasDAGPlan());
+      assertTrue(request.hasAdditionalAmResources());
+    }
+  }
   
   public void testTezClient(boolean isSession) throws Exception {
     Map<String, LocalResource> lrs = Maps.newHashMap();

http://git-wip-us.apache.org/repos/asf/tez/blob/dbd763fd/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
index 029761c..38f6740 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -50,12 +51,14 @@ public class DAGClientServer extends AbstractService {
   DAGClientHandler realInstance;
   Server server;
   InetSocketAddress bindAddress;
+  final FileSystem stagingFs;
 
   public DAGClientServer(DAGClientHandler realInstance,
-      ApplicationAttemptId attemptId) {
+      ApplicationAttemptId attemptId, FileSystem stagingFs) {
     super("DAGClientRPCServer");
     this.realInstance = realInstance;
     this.secretManager = new ClientToAMTokenSecretManager(attemptId, null);
+    this.stagingFs = stagingFs;
   }
 
   @Override
@@ -65,7 +68,7 @@ public class DAGClientServer extends AbstractService {
       InetSocketAddress addr = new InetSocketAddress(0);
 
       DAGClientAMProtocolBlockingPBServerImpl service =
-          new DAGClientAMProtocolBlockingPBServerImpl(realInstance);
+          new DAGClientAMProtocolBlockingPBServerImpl(realInstance, stagingFs);
 
       BlockingService blockingService =
                 DAGClientAMProtocol.newReflectiveBlockingService(service);

http://git-wip-us.apache.org/repos/asf/tez/blob/dbd763fd/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
index fc6b267..32124b9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
@@ -23,6 +23,9 @@ import java.security.AccessControlException;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.client.TezAppMasterStatus;
@@ -55,9 +58,11 @@ import com.google.protobuf.ServiceException;
 public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProtocolBlockingPB
{
 
   DAGClientHandler real;
+  final FileSystem stagingFs;
 
-  public DAGClientAMProtocolBlockingPBServerImpl(DAGClientHandler real) {
+  public DAGClientAMProtocolBlockingPBServerImpl(DAGClientHandler real, FileSystem stagingFs)
{
     this.real = real;
+    this.stagingFs = stagingFs;
   }
 
   private UserGroupInformation getRPCUser() throws ServiceException {
@@ -152,6 +157,15 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProto
       throw new AccessControlException("User " + user + " cannot perform AM modify operation");
     }
     try{
+      if (request.hasSerializedRequestPath()) {
+        // need to deserialize large request from hdfs
+        Path requestPath = new Path(request.getSerializedRequestPath());
+        try (FSDataInputStream fsDataInputStream = stagingFs.open(requestPath)) {
+          request = SubmitDAGRequestProto.parseFrom(fsDataInputStream);
+        } catch (IOException e) {
+          throw wrapException(e);
+        }
+      }
       DAGPlan dagPlan = request.getDAGPlan();
       Map<String, LocalResource> additionalResources = null;
       if (request.hasAdditionalAmResources()) {

http://git-wip-us.apache.org/repos/asf/tez/blob/dbd763fd/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 81a7791..eb9660c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -482,7 +482,19 @@ public class DAGAppMaster extends AbstractService {
 
     addIfService(dispatcher, false);
 
-    clientRpcServer = new DAGClientServer(clientHandler, appAttemptID);
+    recoveryDataDir = TezCommonUtils.getRecoveryPath(tezSystemStagingDir, conf);
+    recoveryFS = recoveryDataDir.getFileSystem(conf);
+    currentRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir,
+        appAttemptID.getAttemptId());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Stage directory information for AppAttemptId :" + this.appAttemptID
+          + " tezSystemStagingDir :" + tezSystemStagingDir + " recoveryDataDir :" + recoveryDataDir
+          + " recoveryAttemptDir :" + currentRecoveryDataDir);
+    }
+    recoveryEnabled = conf.getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED,
+        TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT);
+
+    clientRpcServer = new DAGClientServer(clientHandler, appAttemptID, recoveryFS);
     addIfService(clientRpcServer, true);
 
     taskHeartbeatHandler = createTaskHeartbeatHandler(context, conf);
@@ -589,18 +601,6 @@ public class DAGAppMaster extends AbstractService {
             TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS,
             TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS_DEFAULT);
 
-    recoveryDataDir = TezCommonUtils.getRecoveryPath(tezSystemStagingDir, conf);
-    recoveryFS = recoveryDataDir.getFileSystem(conf);
-    currentRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir,
-        appAttemptID.getAttemptId());
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Stage directory information for AppAttemptId :" + this.appAttemptID
-          + " tezSystemStagingDir :" + tezSystemStagingDir + " recoveryDataDir :" + recoveryDataDir
-          + " recoveryAttemptDir :" + currentRecoveryDataDir);
-    }
-    recoveryEnabled = conf.getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED,
-        TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT);
-
     if (!versionMismatch) {
       if (isSession) {
         FileInputStream sessionResourcesStream = null;

http://git-wip-us.apache.org/repos/asf/tez/blob/dbd763fd/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientServer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientServer.java
b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientServer.java
index bf57cc1..06280d8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientServer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientServer.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -55,9 +56,9 @@ public class TestDAGClientServer {
     try {
       DAGClientHandler mockDAGClientHander = mock(DAGClientHandler.class);
       ApplicationAttemptId mockAppAttempId = mock(ApplicationAttemptId.class);
-      clientServer = new DAGClientServer(mockDAGClientHander, mockAppAttempId);
       Configuration conf = new Configuration();
       conf.set(TezConfiguration.TEZ_AM_CLIENT_AM_PORT_RANGE, port + "-" + port);
+      clientServer = new DAGClientServer(mockDAGClientHander, mockAppAttempId, mock(FileSystem.class));
       clientServer.init(conf);
       clientServer.start();
       int resultedPort = clientServer.getBindAddress().getPort();

http://git-wip-us.apache.org/repos/asf/tez/blob/dbd763fd/tez-dag/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java
b/tez-dag/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java
new file mode 100644
index 0000000..3c030b3
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.client.rpc;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.tez.common.security.ACLManager;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClientHandler;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.MockitoAnnotations;
+
+public class TestDAGClientAMProtocolBlockingPBServerImpl {
+  @Rule
+  public TemporaryFolder tmpFolder = new TemporaryFolder(new File("target"));
+
+  @Captor
+  private ArgumentCaptor<Map<String, LocalResource>> localResourcesCaptor;
+
+  @Before
+  public void init() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @Test(timeout = 5000)
+  @SuppressWarnings("unchecked")
+  public void testSubmitDagInSessionWithLargeDagPlan() throws Exception {
+    int maxIPCMsgSize = 1024;
+    String dagPlanName = "dagplan-name";
+    File requestFile = tmpFolder.newFile("request-file");
+    TezConfiguration conf = new TezConfiguration();
+    conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, maxIPCMsgSize);
+
+    byte[] randomBytes = new byte[2*maxIPCMsgSize];
+    (new Random()).nextBytes(randomBytes);
+    UserPayload payload = UserPayload.create(ByteBuffer.wrap(randomBytes));
+    Vertex vertex = Vertex.create("V", ProcessorDescriptor.create("P").setUserPayload(payload),
1);
+    DAGPlan dagPlan = DAG.create(dagPlanName).addVertex(vertex).createDag(conf, null, null,
null, false);
+
+    String lrName = "localResource";
+    String scheme = "file";
+    String host = "localhost";
+    int port = 80;
+    String path = "/test";
+    URL lrURL = URL.newInstance(scheme, host, port, path);
+    LocalResource localResource = LocalResource.newInstance(lrURL, LocalResourceType.FILE,
+        LocalResourceVisibility.PUBLIC, 1, 1);
+    Map<String, LocalResource> localResources = new HashMap<>();
+    localResources.put(lrName, localResource);
+
+    SubmitDAGRequestProto.Builder requestBuilder = SubmitDAGRequestProto.newBuilder().setDAGPlan(dagPlan)
+        .setAdditionalAmResources(DagTypeConverters.convertFromLocalResources(localResources));
+    try (FileOutputStream fileOutputStream = new FileOutputStream(requestFile)) {
+      requestBuilder.build().writeTo(fileOutputStream);
+    }
+
+    DAGClientHandler dagClientHandler = mock(DAGClientHandler.class);
+    ACLManager aclManager = mock(ACLManager.class);
+    DAGClientAMProtocolBlockingPBServerImpl serverImpl = spy(new DAGClientAMProtocolBlockingPBServerImpl(
+        dagClientHandler, FileSystem.get(conf)));
+    when(dagClientHandler.getACLManager()).thenReturn(aclManager);
+    when(dagClientHandler.submitDAG((DAGPlan)any(), (Map<String, LocalResource>)any())).thenReturn("dag-id");
+    when(aclManager.checkAMModifyAccess((UserGroupInformation) any())).thenReturn(true);
+
+    requestBuilder.clear().setSerializedRequestPath(requestFile.getAbsolutePath());
+    serverImpl.submitDAG(null, requestBuilder.build());
+
+    ArgumentCaptor<DAGPlan> dagPlanCaptor = ArgumentCaptor.forClass(DAGPlan.class);
+    verify(dagClientHandler).submitDAG(dagPlanCaptor.capture(), localResourcesCaptor.capture());
+    dagPlan = dagPlanCaptor.getValue();
+    localResources = localResourcesCaptor.getValue();
+
+    assertEquals(dagPlan.getName(), dagPlanName);
+    assertEquals(dagPlan.getVertexCount(), 1);
+    assertTrue(dagPlan.getSerializedSize() > maxIPCMsgSize);
+    assertArrayEquals(randomBytes, dagPlan.getVertex(0).getProcessorDescriptor().getTezUserPayload().getUserPayload().
+        toByteArray());
+    assertEquals(localResources.size(), 1);
+    assertTrue(localResources.containsKey(lrName));
+    localResource = localResources.get(lrName);
+    assertEquals(localResource.getType(), LocalResourceType.FILE);
+    assertEquals(localResource.getVisibility(), LocalResourceVisibility.PUBLIC);
+    lrURL = localResource.getResource();
+    assertEquals(lrURL.getScheme(), scheme);
+    assertEquals(lrURL.getHost(), host);
+    assertEquals(lrURL.getPort(), port);
+    assertEquals(lrURL.getFile(), path);
+  }
+}


Mime
View raw message