From commits-return-5776-archive-asf-public=cust-asf.ponee.io@tez.apache.org Thu Mar 8 00:52:52 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 78360180656 for ; Thu, 8 Mar 2018 00:52:51 +0100 (CET) Received: (qmail 84102 invoked by uid 500); 7 Mar 2018 23:52:50 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 84093 invoked by uid 99); 7 Mar 2018 23:52:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Mar 2018 23:52:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 42E7AE0182; Wed, 7 Mar 2018 23:52:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gopalv@apache.org To: commits@tez.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-3892: getClient API for TezClient (Eric Wohlstadter via Gopal V) Date: Wed, 7 Mar 2018 23:52:50 +0000 (UTC) Repository: tez Updated Branches: refs/heads/master c34e46c73 -> 82d73b380 TEZ-3892: getClient API for TezClient (Eric Wohlstadter via Gopal V) Signed-off-by: Gopal V Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/82d73b38 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/82d73b38 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/82d73b38 Branch: refs/heads/master Commit: 82d73b380881ef8e7d6e6c963289c4f479bbea59 Parents: c34e46c Author: Eric Wohlstadter Authored: Wed Mar 7 15:50:45 2018 -0800 Committer: Gopal V Committed: Wed Mar 7 15:50:45 2018 -0800 ---------------------------------------------------------------------- .../java/org/apache/tez/client/TezClient.java | 230 +++++++++++++------ .../org/apache/tez/client/TezClientUtils.java | 21 +- .../org/apache/tez/client/TestTezClient.java | 64 +++++- .../org/apache/tez/examples/TezExampleBase.java | 26 ++- 4 files changed, 252 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/82d73b38/tez-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index 65ce0fb..d2c1af4 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -109,6 +109,9 @@ import com.google.protobuf.ServiceException; public class TezClient { private static final Logger LOG = LoggerFactory.getLogger(TezClient.class); + + private static final String appIdStrPrefix = "application"; + private static final String APPLICATION_ID_PREFIX = appIdStrPrefix + '_'; @VisibleForTesting static final String NO_CLUSTER_DIAGNOSTICS_MSG = "No cluster diagnostics found."; @@ -377,40 +380,14 @@ public class TezClient { */ public synchronized void start() throws TezException, IOException { amConfig.setYarnConfiguration(new YarnConfiguration(amConfig.getTezConfiguration())); - - frameworkClient = createFrameworkClient(); - frameworkClient.init(amConfig.getTezConfiguration(), amConfig.getYarnConfiguration()); - frameworkClient.start(); - - if (this.amConfig.getTezConfiguration().getBoolean( - TezConfiguration.TEZ_CLIENT_JAVA_OPTS_CHECKER_ENABLED, - TezConfiguration.TEZ_CLIENT_JAVA_OPTS_CHECKER_ENABLED_DEFAULT)) { - String javaOptsCheckerClassName = this.amConfig.getTezConfiguration().get( - TezConfiguration.TEZ_CLIENT_JAVA_OPTS_CHECKER_CLASS, ""); - if (!javaOptsCheckerClassName.isEmpty()) { - try { - javaOptsChecker = ReflectionUtils.createClazzInstance(javaOptsCheckerClassName); - } catch (Exception e) { - LOG.warn("Failed to initialize configured Java Opts Checker" - + " (" + TezConfiguration.TEZ_CLIENT_JAVA_OPTS_CHECKER_CLASS - + ") , checkerClass=" + javaOptsCheckerClassName - + ". Disabling checker.", e); - javaOptsChecker = null; - } - } else { - javaOptsChecker = new JavaOptsChecker(); - } - - } - + startFrameworkClient(); + setupJavaOptsChecker(); if (isSession) { LOG.info("Session mode. Starting session."); TezClientUtils.processTezLocalCredentialsFile(sessionCredentials, amConfig.getTezConfiguration()); - Map tezJarResources = getTezJarResources(sessionCredentials); - clientTimeout = amConfig.getTezConfiguration().getInt( TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS, TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS_DEFAULT); @@ -420,23 +397,7 @@ public class TezClient { sessionAppId = createApplication(); } - // Add session token for shuffle - TezClientUtils.createSessionToken(sessionAppId.toString(), - jobTokenSecretManager, sessionCredentials); - - ApplicationSubmissionContext appContext = - TezClientUtils.createApplicationSubmissionContext( - sessionAppId, - null, clientName, amConfig, - tezJarResources, sessionCredentials, usingTezArchiveDeploy, apiVersionInfo, - servicePluginsDescriptor, javaOptsChecker); - - // Set Tez Sessions to not retry on AM crashes if recovery is disabled - if (!amConfig.getTezConfiguration().getBoolean( - TezConfiguration.DAG_RECOVERY_ENABLED, - TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) { - appContext.setMaxAppAttempts(1); - } + ApplicationSubmissionContext appContext = setupApplicationContext(); frameworkClient.submitApplication(appContext); ApplicationReport appReport = frameworkClient.getApplicationReport(sessionAppId); LOG.info("The url to track the Tez Session: " + appReport.getTrackingUrl()); @@ -445,31 +406,136 @@ public class TezClient { throw new TezException(e); } - long amClientKeepAliveTimeoutIntervalMillis = - TezCommonUtils.getAMClientHeartBeatTimeoutMillis(amConfig.getTezConfiguration()); - // Poll at minimum of 1 second interval - long pollPeriod = TezCommonUtils. - getAMClientHeartBeatPollIntervalMillis(amConfig.getTezConfiguration(), - amClientKeepAliveTimeoutIntervalMillis, 10); - - boolean isLocal = amConfig.getTezConfiguration().getBoolean( - TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT); - if (!isLocal && amClientKeepAliveTimeoutIntervalMillis > 0) { - amKeepAliveService = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder() - .setDaemon(true).setNameFormat("AMKeepAliveThread #%d").build()); - amKeepAliveService.scheduleWithFixedDelay(new Runnable() { - - private DAGClientAMProtocolBlockingPB proxy; - - @Override - public void run() { - proxy = sendAMHeartbeat(proxy); - } - }, pollPeriod, pollPeriod, TimeUnit.MILLISECONDS); + startClientHeartbeat(); + this.stagingFs = FileSystem.get(amConfig.getTezConfiguration()); + } + } + + public synchronized TezClient getClient(String appIdStr) throws IOException, TezException { + return getClient(appIdfromString(appIdStr)); + } + + /** + * Alternative to start() that explicitly sets sessionAppId and doesn't start a new AM. + * The caller of getClient is responsible for initializing the new TezClient with a + * Configuration compatible with the existing AM. It is expected the caller has cached the + * original Configuration (e.g. in Zookeeper). + * + * In contrast to "start", no resources are localized. It is the responsibility of the caller to + * ensure that existing localized resources and staging dirs are still valid. + * + * @param appId + * @return 'this' just as a convenience for fluent style chaining + */ + public synchronized TezClient getClient(ApplicationId appId) throws TezException, IOException { + sessionAppId = appId; + amConfig.setYarnConfiguration(new YarnConfiguration(amConfig.getTezConfiguration())); + startFrameworkClient(); + setupJavaOptsChecker(); + + if (!isSession) { + String msg = "Must be in session mode to bind TezClient to existing AM"; + LOG.error(msg); + throw new IllegalStateException(msg); + } + + LOG.info("Session mode. Reconnecting to session: " + sessionAppId.toString()); + + clientTimeout = amConfig.getTezConfiguration().getInt( + TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS, + TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS_DEFAULT); + + try { + setupApplicationContext(); + ApplicationReport appReport = frameworkClient.getApplicationReport(sessionAppId); + LOG.info("The url to track the Tez Session: " + appReport.getTrackingUrl()); + sessionStarted.set(true); + } catch (YarnException e) { + throw new TezException(e); + } + + startClientHeartbeat(); + this.stagingFs = FileSystem.get(amConfig.getTezConfiguration()); + return this; + } + + private void startFrameworkClient() { + frameworkClient = createFrameworkClient(); + frameworkClient.init(amConfig.getTezConfiguration(), amConfig.getYarnConfiguration()); + frameworkClient.start(); + } + + private ApplicationSubmissionContext setupApplicationContext() throws IOException, YarnException { + TezClientUtils.processTezLocalCredentialsFile(sessionCredentials, + amConfig.getTezConfiguration()); + + Map tezJarResources = getTezJarResources(sessionCredentials); + // Add session token for shuffle + TezClientUtils.createSessionToken(sessionAppId.toString(), + jobTokenSecretManager, sessionCredentials); + + ApplicationSubmissionContext appContext = + TezClientUtils.createApplicationSubmissionContext( + sessionAppId, + null, clientName, amConfig, + tezJarResources, sessionCredentials, usingTezArchiveDeploy, apiVersionInfo, + servicePluginsDescriptor, javaOptsChecker); + + // Set Tez Sessions to not retry on AM crashes if recovery is disabled + if (!amConfig.getTezConfiguration().getBoolean( + TezConfiguration.DAG_RECOVERY_ENABLED, + TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) { + appContext.setMaxAppAttempts(1); + } + return appContext; + } + + private void setupJavaOptsChecker() { + if (this.amConfig.getTezConfiguration().getBoolean( + TezConfiguration.TEZ_CLIENT_JAVA_OPTS_CHECKER_ENABLED, + TezConfiguration.TEZ_CLIENT_JAVA_OPTS_CHECKER_ENABLED_DEFAULT)) { + String javaOptsCheckerClassName = this.amConfig.getTezConfiguration().get( + TezConfiguration.TEZ_CLIENT_JAVA_OPTS_CHECKER_CLASS, ""); + if (!javaOptsCheckerClassName.isEmpty()) { + try { + javaOptsChecker = ReflectionUtils.createClazzInstance(javaOptsCheckerClassName); + } catch (Exception e) { + LOG.warn("Failed to initialize configured Java Opts Checker" + + " (" + TezConfiguration.TEZ_CLIENT_JAVA_OPTS_CHECKER_CLASS + + ") , checkerClass=" + javaOptsCheckerClassName + + ". Disabling checker.", e); + javaOptsChecker = null; + } + } else { + javaOptsChecker = new JavaOptsChecker(); } - this.stagingFs = FileSystem.get(amConfig.getTezConfiguration()); + } + } + + private void startClientHeartbeat() { + long amClientKeepAliveTimeoutIntervalMillis = + TezCommonUtils.getAMClientHeartBeatTimeoutMillis(amConfig.getTezConfiguration()); + // Poll at minimum of 1 second interval + long pollPeriod = TezCommonUtils. + getAMClientHeartBeatPollIntervalMillis(amConfig.getTezConfiguration(), + amClientKeepAliveTimeoutIntervalMillis, 10); + + boolean isLocal = amConfig.getTezConfiguration().getBoolean( + TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT); + if (!isLocal && amClientKeepAliveTimeoutIntervalMillis > 0) { + amKeepAliveService = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("AMKeepAliveThread #%d").build()); + amKeepAliveService.scheduleWithFixedDelay(new Runnable() { + + private DAGClientAMProtocolBlockingPB proxy; + + @Override + public void run() { + proxy = sendAMHeartbeat(proxy); + } + }, pollPeriod, pollPeriod, TimeUnit.MILLISECONDS); } } @@ -1211,4 +1277,32 @@ public class TezClient { servicePluginsDescriptor); } } + + //Copied this helper method from + //org.apache.hadoop.yarn.api.records.ApplicationId in Hadoop 2.8+ + //to simplify implementation on 2.7.x + @Public + @Unstable + public static ApplicationId appIdfromString(String appIdStr) { + if (!appIdStr.startsWith(APPLICATION_ID_PREFIX)) { + throw new IllegalArgumentException("Invalid ApplicationId prefix: " + + appIdStr + ". The valid ApplicationId should start with prefix " + + appIdStrPrefix); + } + try { + int pos1 = APPLICATION_ID_PREFIX.length() - 1; + int pos2 = appIdStr.indexOf('_', pos1 + 1); + if (pos2 < 0) { + throw new IllegalArgumentException("Invalid ApplicationId: " + + appIdStr); + } + long rmId = Long.parseLong(appIdStr.substring(pos1 + 1, pos2)); + int appId = Integer.parseInt(appIdStr.substring(pos2 + 1)); + ApplicationId applicationId = ApplicationId.newInstance(rmId, appId); + return applicationId; + } catch (NumberFormatException n) { + throw new IllegalArgumentException("Invalid ApplicationId: " + + appIdStr, n); + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/82d73b38/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java index f9316e5..caf610d 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java @@ -575,16 +575,19 @@ public class TezClientUtils { } // emit conf as PB file - ConfigurationProto finalConfProto = createFinalConfProtoForApp(tezConf, - servicePluginsDescriptor); + // don't overwrite existing conf, needed for TezClient.getClient() so existing containers have stable resource fingerprints + if(!binaryConfPath.getFileSystem(tezConf).exists(binaryConfPath)) { + ConfigurationProto finalConfProto = createFinalConfProtoForApp(tezConf, + servicePluginsDescriptor); - FSDataOutputStream amConfPBOutBinaryStream = null; - try { - amConfPBOutBinaryStream = TezCommonUtils.createFileForAM(fs, binaryConfPath); - finalConfProto.writeTo(amConfPBOutBinaryStream); - } finally { - if(amConfPBOutBinaryStream != null){ - amConfPBOutBinaryStream.close(); + FSDataOutputStream amConfPBOutBinaryStream = null; + try { + amConfPBOutBinaryStream = TezCommonUtils.createFileForAM(fs, binaryConfPath); + finalConfProto.writeTo(amConfPBOutBinaryStream); + } finally { + if (amConfPBOutBinaryStream != null) { + amConfPBOutBinaryStream.close(); + } } } http://git-wip-us.apache.org/repos/asf/tez/blob/82d73b38/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java index 89310df..0cbef76 100644 --- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java +++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java @@ -168,12 +168,12 @@ public class TestTezClient { @Test (timeout = 5000) public void testTezclientApp() throws Exception { - testTezClient(false); + testTezClient(false, true); } @Test (timeout = 5000) public void testTezclientSession() throws Exception { - testTezClient(true); + testTezClient(true, true); } @Test (timeout = 5000) @@ -238,8 +238,51 @@ public class TestTezClient { assertTrue(request.hasAdditionalAmResources()); } } + + @Test (timeout = 5000) + public void testGetClient() throws Exception { + /* BEGIN first TezClient usage without calling stop() */ + TezClientForTest client = testTezClient(true, false); + /* END first TezClient usage without calling stop() */ + + /* BEGIN reuse of AM from new TezClient */ + ArgumentCaptor captor = ArgumentCaptor.forClass(ApplicationSubmissionContext.class); + when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState()) + .thenReturn(YarnApplicationState.RUNNING); + + //Reuse existing appId from first TezClient + ApplicationId existingAppId = client.mockAppId; + TezClientForTest client2 = configureAndCreateTezClient(null, true, + client.amConfig.getTezConfiguration()); + String mockLR1Name = "LR1"; + Map lrDAG = Collections.singletonMap(mockLR1Name, LocalResource + .newInstance(URL.newInstance("file", "localhost", 0, "/test1"), LocalResourceType.FILE, + LocalResourceVisibility.PUBLIC, 1, 1)); + Vertex vertex = Vertex.create("Vertex", ProcessorDescriptor.create("P"), 1, + Resource.newInstance(1, 1)); + DAG dag = DAG.create("DAG").addVertex(vertex).addTaskLocalFiles(lrDAG); + + //Bind TezClient to existing app and submit a dag + DAGClient dagClient = client2.getClient(existingAppId).submitDAG(dag); + + assertTrue(dagClient.getExecutionContext().contains(existingAppId.toString())); + assertEquals(dagClient.getSessionIdentifierString(), existingAppId.toString()); + + // Validate request for new AM is not submitted to RM */ + verify(client2.mockYarnClient, times(0)).submitApplication(captor.capture()); + + // Validate dag submission from second TezClient as normal */ + verify(client2.sessionAmProxy, times(1)).submitDAG((RpcController)any(), (SubmitDAGRequestProto) any()); + + // Validate stop from new TezClient as normal */ + client2.stop(); + verify(client2.sessionAmProxy, times(1)).shutdownSession((RpcController) any(), + (ShutdownSessionRequestProto) any()); + verify(client2.mockYarnClient, times(1)).stop(); + /* END reuse of AM from new TezClient */ + } - public void testTezClient(boolean isSession) throws Exception { + public TezClientForTest testTezClient(boolean isSession, boolean shouldStop) throws Exception { Map lrs = Maps.newHashMap(); String lrName1 = "LR1"; lrs.put(lrName1, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test"), @@ -343,13 +386,16 @@ public class TestTezClient { assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( lrName2)); } - - client.stop(); - if (isSession) { - verify(client.sessionAmProxy, times(1)).shutdownSession((RpcController) any(), - (ShutdownSessionRequestProto) any()); + + if(shouldStop) { + client.stop(); + if (isSession) { + verify(client.sessionAmProxy, times(1)).shutdownSession((RpcController) any(), + (ShutdownSessionRequestProto) any()); + } + verify(client.mockYarnClient, times(1)).stop(); } - verify(client.mockYarnClient, times(1)).stop(); + return client; } @Test (timeout=5000) http://git-wip-us.apache.org/repos/asf/tez/blob/82d73b38/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java ---------------------------------------------------------------------- diff --git a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java index a3c0224..6b626b1 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java @@ -63,11 +63,16 @@ public abstract class TezExampleBase extends Configured implements Tool { protected static final String LOCAL_MODE = "local"; protected static final String COUNTER_LOG = "counter"; protected static final String GENERATE_SPLIT_IN_CLIENT = "generateSplitInClient"; + protected static final String LEAVE_AM_RUNNING = "leaveAmRunning"; + protected static final String RECONNECT_APP_ID = "reconnectAppId"; + private boolean disableSplitGrouping = false; private boolean isLocalMode = false; private boolean isCountersLog = false; private boolean generateSplitInClient = false; + private boolean leaveAmRunning = false; + private String reconnectAppId; private HadoopShim hadoopShim; protected boolean isCountersLog() { @@ -88,6 +93,8 @@ public abstract class TezExampleBase extends Configured implements Tool { options.addOption(DISABLE_SPLIT_GROUPING, false , "disable split grouping"); options.addOption(COUNTER_LOG, false , "print counter log"); options.addOption(GENERATE_SPLIT_IN_CLIENT, false, "whether generate split in client"); + options.addOption(LEAVE_AM_RUNNING, false, "whether client should stop session"); + options.addOption(RECONNECT_APP_ID, true, "appId for client reconnect"); return options; } @@ -108,6 +115,12 @@ public abstract class TezExampleBase extends Configured implements Tool { if (optionParser.getCommandLine().hasOption(GENERATE_SPLIT_IN_CLIENT)) { generateSplitInClient = true; } + if (optionParser.getCommandLine().hasOption(LEAVE_AM_RUNNING)) { + leaveAmRunning = true; + } + if (optionParser.getCommandLine().hasOption(RECONNECT_APP_ID)) { + reconnectAppId = optionParser.getCommandLine().getOptionValue(RECONNECT_APP_ID); + } hadoopShim = new HadoopShimsLoader(conf).getHadoopShim(); return _execute(otherArgs, null, null); @@ -231,15 +244,20 @@ public abstract class TezExampleBase extends Configured implements Tool { try { return runJob(otherArgs, tezConf, tezClientInternal); } finally { - if (ownTezClient && tezClientInternal != null) { + if (ownTezClient && tezClientInternal != null && !leaveAmRunning) { tezClientInternal.stop(); } } } private TezClient createTezClient(TezConfiguration tezConf) throws IOException, TezException { - TezClient tezClient = TezClient.create(getClass().getSimpleName(), tezConf); - tezClient.start(); + TezClient tezClient = TezClient.create("TezExampleApplication", tezConf); + if(reconnectAppId != null) { + ApplicationId appId = TezClient.appIdfromString(reconnectAppId); + tezClient.getClient(appId); + } else { + tezClient.start(); + } return tezClient; } @@ -265,6 +283,8 @@ public abstract class TezExampleBase extends Configured implements Tool { + " enable split grouping without this option."); ps.println("-" + COUNTER_LOG + "\t\t to print counters information"); ps.println("-" + GENERATE_SPLIT_IN_CLIENT + "\t\tgenerate input split in client"); + ps.println("-" + LEAVE_AM_RUNNING + "\t\twhether client should stop session"); + ps.println("-" + RECONNECT_APP_ID + "\t\tappId for client reconnect"); ps.println(); ps.println("The Tez example extra options usage syntax is "); ps.println("example_name [extra_options] [example_parameters]");