hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aengin...@apache.org
Subject [33/50] [abbrv] hadoop git commit: YARN-6669. Implemented Kerberos security for YARN service framework. (Contributed by Jian He)
Date Thu, 07 Dec 2017 00:54:18 GMT
YARN-6669.  Implemented Kerberos security for YARN service framework.  (Contributed by Jian He)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d30d5782
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d30d5782
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d30d5782

Branch: refs/heads/HDFS-7240
Commit: d30d57828fddaa8667de49af879cde999907c7f6
Parents: 404eab4
Author: Eric Yang <eyang@apache.org>
Authored: Mon Dec 4 15:11:00 2017 -0500
Committer: Eric Yang <eyang@apache.org>
Committed: Mon Dec 4 15:11:00 2017 -0500

----------------------------------------------------------------------
 .../hadoop/yarn/service/webapp/ApiServer.java   |   1 +
 ...RN-Simplified-V1-API-Layer-For-Services.yaml |  16 +
 .../dev-support/findbugs-exclude.xml            |   5 +-
 .../yarn/service/ClientAMPolicyProvider.java    |  39 ++
 .../yarn/service/ClientAMSecurityInfo.java      |  62 ++++
 .../hadoop/yarn/service/ClientAMService.java    |   9 +
 .../hadoop/yarn/service/ServiceContext.java     |   8 +
 .../hadoop/yarn/service/ServiceMaster.java      | 140 ++++++-
 .../hadoop/yarn/service/ServiceScheduler.java   |  30 +-
 .../service/api/records/KerberosPrincipal.java  | 146 ++++++++
 .../yarn/service/api/records/Service.java       |  23 ++
 .../yarn/service/client/ServiceClient.java      | 174 ++++++---
 .../yarn/service/component/Component.java       |   2 +-
 .../yarn/service/conf/YarnServiceConf.java      |   7 -
 .../yarn/service/conf/YarnServiceConstants.java |   3 +
 .../containerlaunch/AbstractLauncher.java       |  39 +-
 .../containerlaunch/ContainerLaunchService.java |  10 +-
 .../containerlaunch/CredentialUtils.java        | 319 ----------------
 .../hadoop/yarn/service/package-info.java       |  24 ++
 .../yarn/service/provider/ProviderUtils.java    |  53 +--
 .../yarn/service/utils/ServiceApiUtil.java      |  15 +
 .../hadoop/yarn/service/utils/ServiceUtils.java |  31 +-
 .../org.apache.hadoop.security.SecurityInfo     |  14 +
 .../client/api/RegistryOperationsFactory.java   |  21 ++
 .../registry/client/impl/zk/CuratorService.java |   8 +-
 .../client/impl/zk/RegistrySecurity.java        |  96 ++++-
 .../hadoop/registry/server/dns/RegistryDNS.java |   4 +
 .../RMRegistryOperationsService.java            | 246 -------------
 .../services/DeleteCompletionCallback.java      |   3 +-
 .../hadoop/registry/AbstractRegistryTest.java   |  15 +-
 .../integration/TestRegistryRMOperations.java   | 369 -------------------
 .../secure/TestSecureRMRegistryOperations.java  | 348 -----------------
 .../site/markdown/yarn-service/QuickStart.md    |  20 +-
 .../markdown/yarn-service/YarnServiceAPI.md     |  11 +-
 34 files changed, 844 insertions(+), 1467 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
index 1bb6c93..34ab8f0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
@@ -173,6 +173,7 @@ public class ApiServer {
       return Response.status(Status.BAD_REQUEST).entity(serviceStatus)
           .build();
     } catch (Exception e) {
+      LOG.error("Fail to stop service:", e);
       ServiceStatus serviceStatus = new ServiceStatus();
       serviceStatus.setDiagnostics(e.getMessage());
       return Response.status(Status.INTERNAL_SERVER_ERROR)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml
index 088b50c..979883c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml
@@ -244,6 +244,10 @@ definitions:
       queue:
         type: string
         description: The YARN queue that this service should be submitted to.
+      kerberos_principal:
+        description: The Kerberos Principal of the service
+        $ref: '#/definitions/KerberosPrincipal'
+
   Resource:
     description:
       Resource determines the amount of resources (vcores, memory, network, etc.) usable by a container. This field determines the resource to be applied for all the containers of a component or service. The resource specified at the service (or global) level can be overriden at the component level. Only one of profile OR cpu & memory are expected. It raises a validation exception otherwise.
@@ -469,3 +473,15 @@ definitions:
         type: integer
         format: int32
         description: An error code specific to a scenario which service owners should be able to use to understand the failure in addition to the diagnostic information.
+  KerberosPrincipal:
+    description: The kerberos principal info of the user who launches the service.
+    properties:
+      principal_name:
+        type: string
+        description: The principal name of the user who launches the service.
+      keytab:
+        type: string
+        description: |
+          The URI of the kerberos keytab. It supports two modes:
+          URI starts with "hdfs://": A path on hdfs where the keytab is stored. The keytab will be localized by YARN to each host.
+          URI starts with "file://": A path on the local host where the keytab is stored. It is assumed that the keytabs are pre-installed by admins before AM launches.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/dev-support/findbugs-exclude.xml
index 2814cca..80c04c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/dev-support/findbugs-exclude.xml
@@ -44,5 +44,8 @@
         <Field name="registryClient" />
         <Bug pattern="IS2_INCONSISTENT_SYNC"/>
     </Match>
-
+    <Match>
+        <Class name="org.apache.hadoop.yarn.service.ClientAMPolicyProvider"/>
+        <Bug pattern="EI_EXPOSE_REP"/>
+    </Match>
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMPolicyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMPolicyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMPolicyProvider.java
new file mode 100644
index 0000000..365df0f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMPolicyProvider.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service;
+
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.authorize.Service;
+
+/**
+ * PolicyProvider for Client to Service AM protocol.
+ */
+public class ClientAMPolicyProvider extends PolicyProvider {
+
+  private static final Service[] CLIENT_AM_SERVICE =
+      new Service[]{
+          new Service(
+              "security.yarn-service.client-am-protocol.acl",
+              ClientAMProtocol.class)};
+
+  @Override
+  public Service[] getServices() {
+    return CLIENT_AM_SERVICE;
+  };
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMSecurityInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMSecurityInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMSecurityInfo.java
new file mode 100644
index 0000000..e19284b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMSecurityInfo.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
+import org.apache.hadoop.yarn.service.impl.pb.service.ClientAMProtocolPB;
+
+import java.lang.annotation.Annotation;
+
+/**
+ * Security Info for Client to Service AM protocol.
+ */
+public class ClientAMSecurityInfo extends SecurityInfo{
+  @Override
+  public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) {
+    if (!protocol.equals(ClientAMProtocolPB.class)) {
+      return null;
+    }
+    return new KerberosInfo() {
+
+      @Override
+      public Class<? extends Annotation> annotationType() {
+        return null;
+      }
+
+      @Override
+      public String serverPrincipal() {
+        return YarnServiceConstants.PRINCIPAL;
+      }
+
+      @Override
+      public String clientPrincipal() {
+        return null;
+      }
+    };
+  }
+
+  @Override
+  public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
index 8e4c34d..94dd8d6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.service;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.service.AbstractService;
@@ -65,6 +66,14 @@ public class ClientAMService extends AbstractService
     InetSocketAddress address = new InetSocketAddress(0);
     server = rpc.getServer(ClientAMProtocol.class, this, address, conf,
         context.secretManager, 1);
+
+    // Enable service authorization?
+    if (conf.getBoolean(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
+        false)) {
+      this.server.refreshServiceAcl(getConfig(), new ClientAMPolicyProvider());
+    }
+
     server.start();
 
     String nodeHostString =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java
index 94dbc6e..cd41ab7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.ConfigFile;
 import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
 
+import java.nio.ByteBuffer;
+
 public class ServiceContext {
   public Service service = null;
   public SliderFileSystem fs;
@@ -34,6 +36,12 @@ public class ServiceContext {
   public ServiceScheduler scheduler;
   public ClientToAMTokenSecretManager secretManager;
   public ClientAMService clientAMService;
+  // tokens used for container launch
+  public ByteBuffer tokens;
+  // AM keytab principal
+  public String principal;
+  // AM keytab location
+  public String keytab;
 
   public ServiceContext() {
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java
index b0b4f06..1283604 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java
@@ -20,33 +20,49 @@ package org.apache.hadoop.yarn.service;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.ShutdownHookManager;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
+import org.apache.hadoop.yarn.service.exceptions.BadClusterStateException;
 import org.apache.hadoop.yarn.service.monitor.ServiceMonitor;
 import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
-import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
 import org.apache.hadoop.yarn.service.utils.ServiceUtils;
-import org.apache.hadoop.yarn.service.exceptions.BadClusterStateException;
+import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.Iterator;
 import java.util.Map;
 
+import static org.apache.hadoop.yarn.service.conf.YarnServiceConstants.KEYTAB_LOCATION;
+
 public class ServiceMaster extends CompositeService {
 
   private static final Logger LOG =
@@ -63,13 +79,7 @@ public class ServiceMaster extends CompositeService {
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-    //TODO Deprecate slider conf, make sure works with yarn conf
     printSystemEnv();
-    if (UserGroupInformation.isSecurityEnabled()) {
-      UserGroupInformation.setConfiguration(conf);
-    }
-    LOG.info("Login user is {}", UserGroupInformation.getLoginUser());
-
     context = new ServiceContext();
     Path appDir = getAppDir();
     context.serviceHdfsDir = appDir.toString();
@@ -78,6 +88,10 @@ public class ServiceMaster extends CompositeService {
     fs.setAppDir(appDir);
     loadApplicationJson(context, fs);
 
+    if (UserGroupInformation.isSecurityEnabled()) {
+      context.tokens = recordTokensForContainers();
+      doSecureLogin();
+    }
     // Take yarn config from YarnFile and merge them into YarnConfiguration
     for (Map.Entry<String, String> entry : context.service
         .getConfiguration().getProperties().entrySet()) {
@@ -111,6 +125,100 @@ public class ServiceMaster extends CompositeService {
     super.serviceInit(conf);
   }
 
+  // Record the tokens and use them for launching containers.
+  // e.g. localization requires the hdfs delegation tokens
+  private ByteBuffer recordTokensForContainers() throws IOException {
+    Credentials copy = new Credentials(UserGroupInformation.getCurrentUser()
+        .getCredentials());
+    DataOutputBuffer dob = new DataOutputBuffer();
+    try {
+      copy.writeTokenStorageToStream(dob);
+    } finally {
+      dob.close();
+    }
+    // Now remove the AM->RM token so that task containers cannot access it.
+    Iterator<Token<?>> iter = copy.getAllTokens().iterator();
+    while (iter.hasNext()) {
+      Token<?> token = iter.next();
+      LOG.info(token.toString());
+      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+        iter.remove();
+      }
+    }
+    return ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+  }
+
+  // 1. First try to use user specified keytabs
+  // 2. If not specified, then try to use pre-installed keytab at localhost
+  // 3. strip off hdfs delegation tokens to ensure use keytab to talk to hdfs
+  private void doSecureLogin()
+      throws IOException, URISyntaxException {
+    // read the localized keytab specified by user
+    File keytab = new File(String.format(KEYTAB_LOCATION,
+        context.service.getName()));
+    if (!keytab.exists()) {
+      LOG.info("No keytab localized at " + keytab);
+      // Check if there exists a pre-installed keytab at host
+      String preInstalledKeytab = context.service.getKerberosPrincipal()
+          .getKeytab();
+      if (!StringUtils.isEmpty(preInstalledKeytab)) {
+        URI uri = new URI(preInstalledKeytab);
+        if (uri.getScheme().equals("file")) {
+          keytab = new File(uri);
+          LOG.info("Using pre-installed keytab from localhost: " +
+              preInstalledKeytab);
+        }
+      }
+    }
+    if (!keytab.exists()) {
+      LOG.info("No keytab exists: " + keytab);
+      return;
+    }
+    String principal = context.service.getKerberosPrincipal()
+        .getPrincipalName();
+    if (StringUtils.isEmpty((principal))) {
+      principal = UserGroupInformation.getLoginUser().getShortUserName();
+      LOG.info("No principal name specified.  Will use AM " +
+          "login identity {} to attempt keytab-based login", principal);
+    }
+
+    Credentials credentials = UserGroupInformation.getCurrentUser()
+        .getCredentials();
+    LOG.info("User before logged in is: " + UserGroupInformation
+        .getCurrentUser());
+    String principalName = SecurityUtil.getServerPrincipal(principal,
+        ServiceUtils.getLocalHostName(getConfig()));
+    UserGroupInformation.loginUserFromKeytab(principalName,
+        keytab.getAbsolutePath());
+    // add back the credentials
+    UserGroupInformation.getCurrentUser().addCredentials(credentials);
+    LOG.info("User after logged in is: " + UserGroupInformation
+        .getCurrentUser());
+    context.principal = principalName;
+    context.keytab = keytab.getAbsolutePath();
+    removeHdfsDelegationToken(UserGroupInformation.getLoginUser());
+  }
+
+  // Remove HDFS delegation token from login user and ensure AM to use keytab
+  // to talk to hdfs
+  private static void removeHdfsDelegationToken(UserGroupInformation user) {
+    if (!user.isFromKeytab()) {
+      LOG.error("AM is not holding on a keytab in a secure deployment:" +
+          " service will fail when tokens expire");
+    }
+    Credentials credentials = user.getCredentials();
+    Iterator<Token<? extends TokenIdentifier>> iter =
+        credentials.getAllTokens().iterator();
+    while (iter.hasNext()) {
+      Token<? extends TokenIdentifier> token = iter.next();
+      if (token.getKind().equals(
+          DelegationTokenIdentifier.HDFS_DELEGATION_KIND)) {
+        LOG.info("Remove HDFS delegation token {}.", token);
+        iter.remove();
+      }
+    }
+  }
+
   protected ContainerId getAMContainerId() throws BadClusterStateException {
     return ContainerId.fromString(ServiceUtils.mandatoryEnvVariable(
         ApplicationConstants.Environment.CONTAINER_ID.name()));
@@ -133,6 +241,17 @@ public class ServiceMaster extends CompositeService {
   }
 
   @Override
+  protected void serviceStart() throws Exception {
+    LOG.info("Starting service as user " + UserGroupInformation
+        .getCurrentUser());
+    UserGroupInformation.getLoginUser().doAs(
+        (PrivilegedExceptionAction<Void>) () -> {
+          super.serviceStart();
+          return null;
+        }
+    );
+  }
+  @Override
   protected void serviceStop() throws Exception {
     LOG.info("Stopping app master");
     super.serviceStop();
@@ -146,7 +265,8 @@ public class ServiceMaster extends CompositeService {
 
   public static void main(String[] args) throws Exception {
     Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
-    StringUtils.startupShutdownMessage(ServiceMaster.class, args, LOG);
+    org.apache.hadoop.util.StringUtils
+        .startupShutdownMessage(ServiceMaster.class, args, LOG);
     try {
       ServiceMaster serviceMaster = new ServiceMaster("Service Master");
       ShutdownHookManager.get()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
index 6bc5673..bea31cf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
@@ -22,6 +22,7 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -34,6 +35,7 @@ import org.apache.hadoop.registry.client.binding.RegistryUtils;
 import org.apache.hadoop.registry.client.types.ServiceRecord;
 import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
 import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
+import org.apache.hadoop.security.HadoopKerberosName;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
@@ -142,11 +144,29 @@ public class ServiceScheduler extends CompositeService {
   }
 
   public void buildInstance(ServiceContext context, Configuration configuration)
-      throws YarnException {
+      throws YarnException, IOException {
     app = context.service;
     executorService = Executors.newScheduledThreadPool(10);
-    RegistryOperations registryClient = RegistryOperationsFactory
-        .createInstance("ServiceScheduler", configuration);
+    RegistryOperations registryClient = null;
+    if (UserGroupInformation.isSecurityEnabled() &&
+        !StringUtils.isEmpty(context.principal)
+        && !StringUtils.isEmpty(context.keytab)) {
+      Configuration conf = getConfig();
+      // Only take the first section of the principal
+      // e.g. hdfs-demo@EXAMPLE.COM will take hdfs-demo
+      // This is because somehow zookeeper client only uses the first section
+      // for acl validations.
+      String username = new HadoopKerberosName(context.principal.trim())
+          .getServiceName();
+      LOG.info("Set registry user accounts: sasl:" + username);
+      conf.set(KEY_REGISTRY_USER_ACCOUNTS, "sasl:" + username);
+      registryClient = RegistryOperationsFactory
+          .createKerberosInstance(conf,
+              "Client", context.principal, context.keytab);
+    } else {
+      registryClient = RegistryOperationsFactory
+          .createInstance("ServiceScheduler", configuration);
+    }
     addIfService(registryClient);
     yarnRegistryOperations =
         createYarnRegistryOperations(context, registryClient);
@@ -171,7 +191,7 @@ public class ServiceScheduler extends CompositeService {
     dispatcher.setDrainEventsOnStop();
     addIfService(dispatcher);
 
-    containerLaunchService = new ContainerLaunchService(context.fs);
+    containerLaunchService = new ContainerLaunchService(context);
     addService(containerLaunchService);
 
     if (YarnConfiguration.timelineServiceV2Enabled(configuration)) {
@@ -408,7 +428,7 @@ public class ServiceScheduler extends CompositeService {
           }
         } catch (IOException e) {
           LOG.error(
-              "Failed to register app " + app.getName() + " in registry");
+              "Failed to register app " + app.getName() + " in registry", e);
         }
       }
     });

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/KerberosPrincipal.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/KerberosPrincipal.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/KerberosPrincipal.java
new file mode 100644
index 0000000..e38fdb5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/KerberosPrincipal.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.api.records;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import javax.xml.bind.annotation.XmlElement;
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * The kerberos principal of the service.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+@ApiModel(description = "The kerberos principal of the service.")
+@javax.annotation.Generated(value = "io.swagger.codegen.languages" +
+    ".JavaClientCodegen", date = "2017-11-20T11:29:11.785-08:00")
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class KerberosPrincipal implements Serializable {
+  private static final long serialVersionUID = -6431667195287650037L;
+
+  @JsonProperty("principal_name")
+  @XmlElement(name = "principal_name")
+  private String principalName = null;
+
+  @JsonProperty("keytab")
+  @XmlElement(name = "keytab")
+  private String keytab = null;
+
+  public KerberosPrincipal principalName(String principalName) {
+    this.principalName = principalName;
+    return this;
+  }
+
+  /**
+   * The principal name of the service.
+   *
+   * @return principalName
+   **/
+  @ApiModelProperty(value = "The principal name of the service.")
+  public String getPrincipalName() {
+    return principalName;
+  }
+
+  public void setPrincipalName(String principalName) {
+    this.principalName = principalName;
+  }
+
+  public KerberosPrincipal keytab(String keytab) {
+    this.keytab = keytab;
+    return this;
+  }
+
+  /**
+   * The URI of the kerberos keytab. It supports two schemes \&quot;
+   * hdfs\&quot; and \&quot;file\&quot;. If the URI starts with \&quot;
+   * hdfs://\&quot; scheme, it indicates the path on hdfs where the keytab is
+   * stored. The keytab will be localized by YARN and made available to AM in
+   * its local directory. If the URI starts with \&quot;file://\&quot;
+   * scheme, it indicates a path on the local host presumbaly installed by
+   * admins upfront.
+   *
+   * @return keytab
+   **/
+  @ApiModelProperty(value = "The URI of the kerberos keytab. It supports two " +
+      "schemes \"hdfs\" and \"file\". If the URI starts with \"hdfs://\" " +
+      "scheme, it indicates the path on hdfs where the keytab is stored. The " +
+      "keytab will be localized by YARN and made available to AM in its local" +
+      " directory. If the URI starts with \"file://\" scheme, it indicates a " +
+      "path on the local host where the keytab is presumbaly installed by " +
+      "admins upfront. ")
+  public String getKeytab() {
+    return keytab;
+  }
+
+  public void setKeytab(String keytab) {
+    this.keytab = keytab;
+  }
+
+
+  @Override
+  public boolean equals(java.lang.Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    KerberosPrincipal kerberosPrincipal = (KerberosPrincipal) o;
+    return Objects.equals(this.principalName, kerberosPrincipal
+        .principalName) &&
+        Objects.equals(this.keytab, kerberosPrincipal.keytab);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(principalName, keytab);
+  }
+
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("class KerberosPrincipal {\n");
+
+    sb.append("    principalName: ").append(toIndentedString(principalName))
+        .append("\n");
+    sb.append("    keytab: ").append(toIndentedString(keytab)).append("\n");
+    sb.append("}");
+    return sb.toString();
+  }
+
+  /**
+   * Convert the given object to string with each line indented by 4 spaces
+   * (except the first line).
+   */
+  private String toIndentedString(java.lang.Object o) {
+    if (o == null) {
+      return "null";
+    }
+    return o.toString().replace("\n", "\n    ");
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Service.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Service.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Service.java
index 8045822..392b71e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Service.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Service.java
@@ -71,6 +71,9 @@ public class Service extends BaseResource {
   private ServiceState state = null;
   private Map<String, String> quicklinks = new HashMap<>();
   private String queue = null;
+  @JsonProperty("kerberos_principal")
+  @XmlElement(name = "kerberos_principal")
+  private KerberosPrincipal kerberosPrincipal = new KerberosPrincipal();
 
   /**
    * A unique service name.
@@ -335,6 +338,24 @@ public class Service extends BaseResource {
     this.queue = queue;
   }
 
+  public Service kerberosPrincipal(KerberosPrincipal kerberosPrincipal) {
+    this.kerberosPrincipal = kerberosPrincipal;
+    return this;
+  }
+
+  /**
+   * The Kerberos Principal of the service.
+   * @return kerberosPrincipal
+   **/
+  @ApiModelProperty(value = "The Kerberos Principal of the service")
+  public KerberosPrincipal getKerberosPrincipal() {
+    return kerberosPrincipal;
+  }
+
+  public void setKerberosPrincipal(KerberosPrincipal kerberosPrincipal) {
+    this.kerberosPrincipal = kerberosPrincipal;
+  }
+
   @Override
   public boolean equals(java.lang.Object o) {
     if (this == o) {
@@ -376,6 +397,8 @@ public class Service extends BaseResource {
     sb.append("    quicklinks: ").append(toIndentedString(quicklinks))
         .append("\n");
     sb.append("    queue: ").append(toIndentedString(queue)).append("\n");
+    sb.append("    kerberosPrincipal: ")
+        .append(toIndentedString(kerberosPrincipal)).append("\n");
     sb.append("}");
     return sb.toString();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
index d1b6026..81c56d2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
@@ -28,12 +28,15 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.registry.client.api.RegistryConstants;
 import org.apache.hadoop.registry.client.api.RegistryOperations;
 import org.apache.hadoop.registry.client.api.RegistryOperationsFactory;
 import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
@@ -43,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.client.api.AppAdminClient;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.client.util.YarnClientUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -79,6 +83,9 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
 import java.text.MessageFormat;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
@@ -98,7 +105,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
   //TODO disable retry so that client / rest API doesn't block?
   protected YarnClient yarnClient;
   // Avoid looking up applicationId from fs all the time.
-  private Map<String, ApplicationId> cachedAppIds = new ConcurrentHashMap<>();
+  private Map<String, AppInfo> cachedAppInfo = new ConcurrentHashMap<>();
 
   private RegistryOperations registryClient;
   private CuratorFramework curatorClient;
@@ -210,7 +217,8 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
     // Write the definition first and then submit - AM will read the definition
     createDirAndPersistApp(appDir, service);
     ApplicationId appId = submitApp(service);
-    cachedAppIds.put(serviceName, appId);
+    cachedAppInfo.put(serviceName, new AppInfo(appId, service
+        .getKerberosPrincipal().getPrincipalName()));
     service.setId(appId.toString());
     // update app definition with appId
     persistAppDef(appDir, service);
@@ -224,8 +232,9 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
     Service persistedService =
         ServiceApiUtil.loadService(fs, serviceName);
     if (!StringUtils.isEmpty(persistedService.getId())) {
-      cachedAppIds.put(persistedService.getName(),
-          ApplicationId.fromString(persistedService.getId()));
+      cachedAppInfo.put(persistedService.getName(), new AppInfo(
+          ApplicationId.fromString(persistedService.getId()),
+          persistedService.getKerberosPrincipal().getPrincipalName()));
     } else {
       throw new YarnException(persistedService.getName()
           + " appId is null, may be not submitted to YARN yet");
@@ -278,8 +287,9 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
       throw new YarnException(
           serviceName + " appId is null, may be not submitted to YARN yet");
     }
-    cachedAppIds.put(persistedService.getName(),
-        ApplicationId.fromString(persistedService.getId()));
+    cachedAppInfo.put(persistedService.getName(), new AppInfo(
+        ApplicationId.fromString(persistedService.getId()), persistedService
+        .getKerberosPrincipal().getPrincipalName()));
     return flexComponents(serviceName, componentCounts, persistedService);
   }
 
@@ -328,7 +338,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
       throw new YarnException(serviceName + " AM hostname is empty");
     }
     ClientAMProtocol proxy =
-        createAMProxy(appReport.getHost(), appReport.getRpcPort());
+        createAMProxy(serviceName, appReport);
     proxy.flexComponents(requestBuilder.build());
     for (Map.Entry<String, Long> entry : original.entrySet()) {
       LOG.info("[COMPONENT {}]: number of containers changed from {} to {}",
@@ -366,8 +376,8 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
     LOG.info("Stopping service {}, with appId = {}", serviceName, currentAppId);
     try {
       ClientAMProtocol proxy =
-          createAMProxy(report.getHost(), report.getRpcPort());
-      cachedAppIds.remove(serviceName);
+          createAMProxy(serviceName, report);
+      cachedAppInfo.remove(serviceName);
       if (proxy != null) {
         // try to stop the app gracefully.
         StopRequestProto request = StopRequestProto.newBuilder().build();
@@ -406,8 +416,8 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
         }
       }
     } catch (IOException | YarnException | InterruptedException e) {
-      LOG.info("Failed to stop " + serviceName
-          + " gracefully, forcefully kill the app.");
+      LOG.info("Failed to stop " + serviceName + " gracefully due to: "
+          + e.getMessage() + ", forcefully kill the app.");
       yarnClient.killApplication(currentAppId, "Forcefully kill the app");
     }
     return EXIT_SUCCESS;
@@ -421,7 +431,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
     Path appDir = fs.buildClusterDirPath(serviceName);
     FileSystem fileSystem = fs.getFileSystem();
     // remove from the appId cache
-    cachedAppIds.remove(serviceName);
+    cachedAppInfo.remove(serviceName);
     if (fileSystem.exists(appDir)) {
       if (fileSystem.delete(appDir, true)) {
         LOG.info("Successfully deleted service dir for " + serviceName + ": "
@@ -552,7 +562,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
     // copy jars to hdfs and add to localResources
     addJarResource(serviceName, localResources);
     // add keytab if in secure env
-    addKeytabResourceIfSecure(fs, localResources, conf, serviceName);
+    addKeytabResourceIfSecure(fs, localResources, app);
     if (LOG.isDebugEnabled()) {
       printLocalResources(localResources);
     }
@@ -581,6 +591,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
     amLaunchContext.setCommands(Collections.singletonList(cmdStr));
     amLaunchContext.setEnvironment(env);
     amLaunchContext.setLocalResources(localResources);
+    addHdfsDelegationTokenIfSecure(amLaunchContext);
     submissionContext.setAMContainerSpec(amLaunchContext);
     yarnClient.submitApplication(submissionContext);
     return submissionContext.getApplicationId();
@@ -771,38 +782,75 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
     return appJson;
   }
 
+  private void addHdfsDelegationTokenIfSecure(ContainerLaunchContext amContext)
+      throws IOException {
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return;
+    }
+    Credentials credentials = new Credentials();
+    String tokenRenewer = YarnClientUtils.getRmPrincipal(getConfig());
+    if (StringUtils.isEmpty(tokenRenewer)) {
+      throw new IOException(
+          "Can't get Master Kerberos principal for the RM to use as renewer");
+    }
+    // Get hdfs dt
+    final org.apache.hadoop.security.token.Token<?>[] tokens =
+        fs.getFileSystem().addDelegationTokens(tokenRenewer, credentials);
+    if (tokens != null && tokens.length != 0) {
+      for (Token<?> token : tokens) {
+        LOG.debug("Got DT: " + token);
+      }
+      DataOutputBuffer dob = new DataOutputBuffer();
+      credentials.writeTokenStorageToStream(dob);
+      ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+      amContext.setTokens(fsTokens);
+    }
+  }
+
   private void addKeytabResourceIfSecure(SliderFileSystem fileSystem,
-      Map<String, LocalResource> localResource, Configuration conf,
-      String serviceName) throws IOException, BadConfigException {
+      Map<String, LocalResource> localResource, Service service)
+      throws IOException, YarnException {
     if (!UserGroupInformation.isSecurityEnabled()) {
       return;
     }
-    String keytabPreInstalledOnHost =
-        conf.get(YarnServiceConf.KEY_AM_KEYTAB_LOCAL_PATH);
-    if (StringUtils.isEmpty(keytabPreInstalledOnHost)) {
-      String amKeytabName =
-          conf.get(YarnServiceConf.KEY_AM_LOGIN_KEYTAB_NAME);
-      String keytabDir = conf.get(YarnServiceConf.KEY_HDFS_KEYTAB_DIR);
-      Path keytabPath =
-          fileSystem.buildKeytabPath(keytabDir, amKeytabName, serviceName);
-      if (fileSystem.getFileSystem().exists(keytabPath)) {
-        LocalResource keytabRes =
-            fileSystem.createAmResource(keytabPath, LocalResourceType.FILE);
-        localResource
-            .put(YarnServiceConstants.KEYTAB_DIR + "/" + amKeytabName, keytabRes);
-        LOG.info("Adding AM keytab on hdfs: " + keytabPath);
-      } else {
-        LOG.warn("No keytab file was found at {}.", keytabPath);
-        if (conf.getBoolean(YarnServiceConf.KEY_AM_LOGIN_KEYTAB_REQUIRED, false)) {
-          throw new BadConfigException("No keytab file was found at %s.",
-              keytabPath);
-        } else {
-          LOG.warn("The AM will be "
-              + "started without a kerberos authenticated identity. "
-              + "The service is therefore not guaranteed to remain "
-              + "operational beyond 24 hours.");
-        }
+    String principalName = service.getKerberosPrincipal().getPrincipalName();
+    if (StringUtils.isEmpty(principalName)) {
+      LOG.warn("No Kerberos principal name specified for " + service.getName());
+      return;
+    }
+    if(StringUtils.isEmpty(service.getKerberosPrincipal().getKeytab())) {
+      LOG.warn("No Kerberos keytab specified for " + service.getName());
+      return;
+    }
+
+    URI keytabURI;
+    try {
+      keytabURI = new URI(service.getKerberosPrincipal().getKeytab());
+    } catch (URISyntaxException e) {
+      throw new YarnException(e);
+    }
+
+    switch (keytabURI.getScheme()) {
+    case "hdfs":
+      Path keytabOnhdfs = new Path(keytabURI);
+      if (!fileSystem.getFileSystem().exists(keytabOnhdfs)) {
+        LOG.warn(service.getName() + "'s keytab (principalName = " +
+            principalName + ") doesn't exist at: " + keytabOnhdfs);
+        return;
       }
+      LocalResource keytabRes =
+          fileSystem.createAmResource(keytabOnhdfs, LocalResourceType.FILE);
+      localResource.put(String.format(YarnServiceConstants.KEYTAB_LOCATION,
+          service.getName()), keytabRes);
+      LOG.debug("Adding " + service.getName() + "'s keytab for " +
+          "localization, uri = " + keytabOnhdfs);
+      break;
+    case "file":
+      LOG.debug("Using a keytab from localhost: " + keytabURI);
+      break;
+    default:
+      LOG.warn("Unsupported URI scheme " + keytabURI);
+      break;
     }
   }
 
@@ -856,7 +904,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
       return "";
     }
     ClientAMProtocol amProxy =
-        createAMProxy(appReport.getHost(), appReport.getRpcPort());
+        createAMProxy(appReport.getName(), appReport);
     GetStatusResponseProto response =
         amProxy.getStatus(GetStatusRequestProto.newBuilder().build());
     return response.getStatus();
@@ -886,7 +934,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
       return appSpec;
     }
     ClientAMProtocol amProxy =
-        createAMProxy(appReport.getHost(), appReport.getRpcPort());
+        createAMProxy(serviceName, appReport);
     GetStatusResponseProto response =
         amProxy.getStatus(GetStatusRequestProto.newBuilder().build());
     appSpec = jsonSerDeser.fromJson(response.getStatus());
@@ -935,18 +983,37 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
     }
   }
 
-  protected ClientAMProtocol createAMProxy(String host, int port)
-      throws IOException {
+  protected ClientAMProtocol createAMProxy(String serviceName,
+      ApplicationReport appReport) throws IOException, YarnException {
+
+    if (UserGroupInformation.isSecurityEnabled()) {
+      if (!cachedAppInfo.containsKey(serviceName)) {
+        Service persistedService  = ServiceApiUtil.loadService(fs, serviceName);
+        cachedAppInfo.put(serviceName, new AppInfo(appReport.getApplicationId(),
+            persistedService.getKerberosPrincipal().getPrincipalName()));
+      }
+      String principalName = cachedAppInfo.get(serviceName).principalName;
+      // Inject the principal into hadoop conf, because Hadoop
+      // SaslRpcClient#getServerPrincipal requires a config for the
+      // principal
+      if (!StringUtils.isEmpty(principalName)) {
+        getConfig().set(PRINCIPAL, principalName);
+      } else {
+        throw new YarnException("No principal specified in the persisted " +
+            "service definition, fail to connect to AM.");
+      }
+    }
     InetSocketAddress address =
-        NetUtils.createSocketAddrForHost(host, port);
+        NetUtils.createSocketAddrForHost(appReport.getHost(), appReport
+            .getRpcPort());
     return ClientAMProxy.createProxy(getConfig(), ClientAMProtocol.class,
         UserGroupInformation.getCurrentUser(), rpc, address);
   }
 
   public synchronized ApplicationId getAppId(String serviceName)
       throws IOException, YarnException {
-    if (cachedAppIds.containsKey(serviceName)) {
-      return cachedAppIds.get(serviceName);
+    if (cachedAppInfo.containsKey(serviceName)) {
+      return cachedAppInfo.get(serviceName).appId;
     }
     Service persistedService = ServiceApiUtil.loadService(fs, serviceName);
     if (persistedService == null) {
@@ -954,7 +1021,18 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
           + " doesn't exist on hdfs. Please check if the app exists in RM");
     }
     ApplicationId currentAppId = ApplicationId.fromString(persistedService.getId());
-    cachedAppIds.put(serviceName, currentAppId);
+    cachedAppInfo.put(serviceName, new AppInfo(currentAppId, persistedService
+        .getKerberosPrincipal().getPrincipalName()));
     return currentAppId;
   }
+
+  private static class AppInfo {
+    ApplicationId appId;
+    String principalName;
+
+    AppInfo(ApplicationId appId, String principalName) {
+      this.appId = appId;
+      this.principalName = principalName;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
index 88f4763..4e05e5f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
@@ -101,7 +101,7 @@ public class Component implements EventHandler<ComponentEvent> {
       new StateMachineFactory<Component, ComponentState, ComponentEventType, ComponentEvent>(
           INIT)
            // INIT will only got to FLEXING
-          .addTransition(INIT, EnumSet.of(STABLE, FLEXING),
+          .addTransition(INIT, EnumSet.of(STABLE, FLEXING, INIT),
               FLEX, new FlexComponentTransition())
           // container recovered on AM restart
           .addTransition(INIT, INIT, CONTAINER_RECOVERED,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java
index 684d980..ea8904a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java
@@ -53,13 +53,6 @@ public class YarnServiceConf {
    */
   public static final String YARN_SERVICE_BASE_PATH = "yarn.service.base.path";
 
-  //TODO rename
-  /** Declare that a keytab must be provided */
-  public static final String KEY_AM_LOGIN_KEYTAB_REQUIRED = "slider.am.login.keytab.required";
-  public static final String KEY_AM_LOGIN_KEYTAB_NAME = "slider.am.login.keytab.name";
-  public static final String KEY_HDFS_KEYTAB_DIR = "slider.hdfs.keytab.dir";
-  public static final String KEY_AM_KEYTAB_LOCAL_PATH = "slider.am.keytab.local.path";
-
   /**
    * maximum number of failed containers (in a single component)
    * before the app exits

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java
index 3973759..0378d24 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java
@@ -40,6 +40,8 @@ public interface YarnServiceConstants {
   String APP_TYPE = "yarn-service";
 
   String KEYTAB_DIR = "keytabs";
+  String KEYTAB_LOCATION = KEYTAB_DIR + "/%s" + ".keytab";
+
   String RESOURCE_DIR = "resources";
 
 
@@ -89,4 +91,5 @@ public interface YarnServiceConstants {
   String ERR_FILE = "stderr.txt";
 
   String CONTENT = "content";
+  String PRINCIPAL = "yarn.service.am.principal";
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/AbstractLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/AbstractLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/AbstractLauncher.java
index 2d7c3bb..e1e88cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/AbstractLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/AbstractLauncher.java
@@ -19,16 +19,15 @@
 package org.apache.hadoop.yarn.service.containerlaunch;
 
 import com.google.common.base.Preconditions;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
 import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.service.ServiceContext;
 import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
-import org.apache.hadoop.yarn.service.utils.CoreFileSystem;
 import org.apache.hadoop.yarn.service.utils.ServiceUtils;
+import org.apache.hadoop.yarn.util.Records;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,10 +49,6 @@ public class AbstractLauncher {
     LoggerFactory.getLogger(AbstractLauncher.class);
   public static final String CLASSPATH = "CLASSPATH";
   /**
-   * Filesystem to use for the launch
-   */
-  protected final CoreFileSystem coreFileSystem;
-  /**
    * Env vars; set up at final launch stage
    */
   protected final Map<String, String> envVars = new HashMap<>();
@@ -63,25 +58,15 @@ public class AbstractLauncher {
   protected final Map<String, LocalResource> localResources = new HashMap<>();
   protected final Map<String, String> mountPaths = new HashMap<>();
   private final Map<String, ByteBuffer> serviceData = new HashMap<>();
-  // security
-  protected final Credentials credentials;
   protected boolean yarnDockerMode = false;
   protected String dockerImage;
   protected String dockerNetwork = DEFAULT_DOCKER_NETWORK;
   protected String dockerHostname;
   protected String runPrivilegedContainer;
+  private ServiceContext context;
 
-
-  /**
-   * Create instance.
-   * @param coreFileSystem filesystem
-   * @param credentials initial set of credentials -null is permitted
-   */
-  public AbstractLauncher(
-      CoreFileSystem coreFileSystem,
-      Credentials credentials) {
-    this.coreFileSystem = coreFileSystem;
-    this.credentials = credentials != null ? credentials: new Credentials();
+  public AbstractLauncher(ServiceContext context) {
+    this.context = context;
   }
   
   public void setYarnDockerMode(boolean yarnDockerMode){
@@ -113,14 +98,6 @@ public class AbstractLauncher {
     mountPaths.put(subPath, mountPath);
   }
 
-  /**
-   * Accessor to the credentials
-   * @return the credentials associated with this launcher
-   */
-  public Credentials getCredentials() {
-    return credentials;
-  }
-
 
   public void addCommand(String cmd) {
     commands.add(cmd);
@@ -160,9 +137,9 @@ public class AbstractLauncher {
     containerLaunchContext.setLocalResources(localResources);
 
     //tokens
-    log.debug("{} tokens", credentials.numberOfTokens());
-    containerLaunchContext.setTokens(CredentialUtils.marshallCredentials(
-        credentials));
+    if (context.tokens != null) {
+      containerLaunchContext.setTokens(context.tokens.duplicate());
+    }
 
     if(yarnDockerMode){
       Map<String, String> env = containerLaunchContext.getEnvironment();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
index b9f3a24..e07661b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.service.containerlaunch;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.service.ServiceContext;
 import org.apache.hadoop.yarn.service.api.records.Component;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
 import org.apache.hadoop.yarn.service.provider.ProviderService;
@@ -40,10 +41,11 @@ public class ContainerLaunchService extends AbstractService{
 
   private ExecutorService executorService;
   private SliderFileSystem fs;
-
-  public ContainerLaunchService(SliderFileSystem fs) {
+  private ServiceContext context;
+  public ContainerLaunchService(ServiceContext context) {
     super(ContainerLaunchService.class.getName());
-    this.fs = fs;
+    this.fs = context.fs;
+    this.context = context;
   }
 
   @Override
@@ -84,7 +86,7 @@ public class ContainerLaunchService extends AbstractService{
       Component compSpec = instance.getCompSpec();
       ProviderService provider = ProviderFactory.getProviderService(
           compSpec.getArtifact());
-      AbstractLauncher launcher = new AbstractLauncher(fs, null);
+      AbstractLauncher launcher = new AbstractLauncher(context);
       try {
         provider.buildContainerLaunchContext(launcher, service,
             instance, fs, getConfig(), container);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/CredentialUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/CredentialUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/CredentialUtils.java
deleted file mode 100644
index fce58e5..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/CredentialUtils.java
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.service.containerlaunch;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
-import org.apache.hadoop.yarn.client.ClientRMProxy;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.HAUtil;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.text.DateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
-
-import static org.apache.hadoop.yarn.conf.YarnConfiguration.*;
-
-/**
- * Utils to work with credentials and tokens.
- *
- * Designed to be movable to Hadoop core
- */
-public final class CredentialUtils {
-
-  private CredentialUtils() {
-  }
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(CredentialUtils.class);
-
-  /**
-   * Save credentials to a byte buffer. Returns null if there were no
-   * credentials to save
-   * @param credentials credential set
-   * @return a byte buffer of serialized tokens
-   * @throws IOException if the credentials could not be written to the stream
-   */
-  public static ByteBuffer marshallCredentials(Credentials credentials) throws IOException {
-    ByteBuffer buffer = null;
-    if (!credentials.getAllTokens().isEmpty()) {
-      DataOutputBuffer dob = new DataOutputBuffer();
-      try {
-        credentials.writeTokenStorageToStream(dob);
-      } finally {
-        dob.close();
-      }
-      buffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
-    }
-    return buffer;
-  }
-
-  /**
-   * Save credentials to a file
-   * @param file file to save to (will be overwritten)
-   * @param credentials credentials to write
-   * @throws IOException
-   */
-  public static void saveTokens(File file,
-      Credentials credentials) throws IOException {
-    try(DataOutputStream daos = new DataOutputStream(
-        new FileOutputStream(file))) {
-      credentials.writeTokenStorageToStream(daos);
-    }
-  }
-
-  /**
-   * Look up and return the resource manager's principal. This method
-   * automatically does the <code>_HOST</code> replacement in the principal and
-   * correctly handles HA resource manager configurations.
-   *
-   * From: YARN-4629
-   * @param conf the {@link Configuration} file from which to read the
-   * principal
-   * @return the resource manager's principal string
-   * @throws IOException thrown if there's an error replacing the host name
-   */
-  public static String getRMPrincipal(Configuration conf) throws IOException {
-    String principal = conf.get(RM_PRINCIPAL, "");
-    String hostname;
-    Preconditions.checkState(!principal.isEmpty(), "Not set: " + RM_PRINCIPAL);
-
-    if (HAUtil.isHAEnabled(conf)) {
-      YarnConfiguration yarnConf = new YarnConfiguration(conf);
-      if (yarnConf.get(RM_HA_ID) == null) {
-        // If RM_HA_ID is not configured, use the first of RM_HA_IDS.
-        // Any valid RM HA ID should work.
-        String[] rmIds = yarnConf.getStrings(RM_HA_IDS);
-        Preconditions.checkState((rmIds != null) && (rmIds.length > 0),
-            "Not set " + RM_HA_IDS);
-        yarnConf.set(RM_HA_ID, rmIds[0]);
-      }
-
-      hostname = yarnConf.getSocketAddr(
-          RM_ADDRESS,
-          DEFAULT_RM_ADDRESS,
-          DEFAULT_RM_PORT).getHostName();
-    } else {
-      hostname = conf.getSocketAddr(
-          RM_ADDRESS,
-          DEFAULT_RM_ADDRESS,
-          DEFAULT_RM_PORT).getHostName();
-    }
-    return SecurityUtil.getServerPrincipal(principal, hostname);
-  }
-
-  /**
-   * Create and add any filesystem delegation tokens with
-   * the RM(s) configured to be able to renew them. Returns null
-   * on an insecure cluster (i.e. harmless)
-   * @param conf configuration
-   * @param fs filesystem
-   * @param credentials credentials to update
-   * @return a list of all added tokens.
-   * @throws IOException
-   */
-  public static Token<?>[] addRMRenewableFSDelegationTokens(Configuration conf,
-      FileSystem fs,
-      Credentials credentials) throws IOException {
-    Preconditions.checkArgument(conf != null);
-    Preconditions.checkArgument(credentials != null);
-    if (UserGroupInformation.isSecurityEnabled()) {
-      return fs.addDelegationTokens(CredentialUtils.getRMPrincipal(conf),
-          credentials);
-    }
-    return null;
-  }
-
-  /**
-   * Add an FS delegation token which can be renewed by the current user
-   * @param fs filesystem
-   * @param credentials credentials to update
-   * @throws IOException problems.
-   */
-  public static void addSelfRenewableFSDelegationTokens(
-      FileSystem fs,
-      Credentials credentials) throws IOException {
-    Preconditions.checkArgument(fs != null);
-    Preconditions.checkArgument(credentials != null);
-    fs.addDelegationTokens(
-        getSelfRenewer(),
-        credentials);
-  }
-
-  public static String getSelfRenewer() throws IOException {
-    return UserGroupInformation.getLoginUser().getShortUserName();
-  }
-
-  /**
-   * Create and add an RM delegation token to the credentials
-   * @param yarnClient Yarn Client
-   * @param credentials to add token to
-   * @return the token which was added
-   * @throws IOException
-   * @throws YarnException
-   */
-  public static Token<TokenIdentifier> addRMDelegationToken(YarnClient yarnClient,
-      Credentials credentials)
-      throws IOException, YarnException {
-    Configuration conf = yarnClient.getConfig();
-    Text rmPrincipal = new Text(CredentialUtils.getRMPrincipal(conf));
-    Text rmDTService = ClientRMProxy.getRMDelegationTokenService(conf);
-    Token<TokenIdentifier> rmDelegationToken =
-        ConverterUtils.convertFromYarn(
-            yarnClient.getRMDelegationToken(rmPrincipal),
-            rmDTService);
-    credentials.addToken(rmDelegationToken.getService(), rmDelegationToken);
-    return rmDelegationToken;
-  }
-
-  public static Token<TimelineDelegationTokenIdentifier> maybeAddTimelineToken(
-      Configuration conf,
-      Credentials credentials)
-      throws IOException, YarnException {
-    if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false)) {
-      LOG.debug("Timeline service enabled -fetching token");
-
-      try(TimelineClient timelineClient = TimelineClient.createTimelineClient()) {
-        timelineClient.init(conf);
-        timelineClient.start();
-        Token<TimelineDelegationTokenIdentifier> token =
-            timelineClient.getDelegationToken(
-                CredentialUtils.getRMPrincipal(conf));
-        credentials.addToken(token.getService(), token);
-        return token;
-      }
-    } else {
-      LOG.debug("Timeline service is disabled");
-      return null;
-    }
-  }
-
-  /**
-   * Filter a list of tokens from a set of credentials
-   * @param credentials credential source (a new credential set os re
-   * @param filter List of tokens to strip out
-   * @return a new, filtered, set of credentials
-   */
-  public static Credentials filterTokens(Credentials credentials,
-      List<Text> filter) {
-    Credentials result = new Credentials(credentials);
-    Iterator<Token<? extends TokenIdentifier>> iter =
-        result.getAllTokens().iterator();
-    while (iter.hasNext()) {
-      Token<? extends TokenIdentifier> token = iter.next();
-      LOG.debug("Token {}", token.getKind());
-      if (filter.contains(token.getKind())) {
-        LOG.debug("Filtering token {}", token.getKind());
-        iter.remove();
-      }
-    }
-    return result;
-  }
-
-  public static String dumpTokens(Credentials credentials, String separator) {
-    ArrayList<Token<? extends TokenIdentifier>> sorted =
-        new ArrayList<>(credentials.getAllTokens());
-    Collections.sort(sorted, new TokenComparator());
-    StringBuilder buffer = new StringBuilder(sorted.size()* 128);
-    for (Token<? extends TokenIdentifier> token : sorted) {
-      buffer.append(tokenToString(token)).append(separator);
-    }
-    return buffer.toString();
-  }
-
-  /**
-   * Create a string for people to look at
-   * @param token token to convert to a string form
-   * @return a printable view of the token
-   */
-  public static String tokenToString(Token<? extends TokenIdentifier> token) {
-    DateFormat df = DateFormat.getDateTimeInstance(
-        DateFormat.SHORT, DateFormat.SHORT);
-    StringBuilder buffer = new StringBuilder(128);
-    buffer.append(token.toString());
-    try {
-      TokenIdentifier ti = token.decodeIdentifier();
-      buffer.append("; ").append(ti);
-      if (ti instanceof AbstractDelegationTokenIdentifier) {
-        // details in human readable form, and compensate for information HDFS DT omits
-        AbstractDelegationTokenIdentifier dt = (AbstractDelegationTokenIdentifier) ti;
-        buffer.append("; Renewer: ").append(dt.getRenewer());
-        buffer.append("; Issued: ")
-            .append(df.format(new Date(dt.getIssueDate())));
-        buffer.append("; Max Date: ")
-            .append(df.format(new Date(dt.getMaxDate())));
-      }
-    } catch (IOException e) {
-      //marshall problem; not ours
-      LOG.debug("Failed to decode {}: {}", token, e, e);
-    }
-    return buffer.toString();
-  }
-
-  /**
-   * Get the expiry time of a token.
-   * @param token token to examine
-   * @return the time in milliseconds after which the token is invalid.
-   * @throws IOException
-   */
-  public static long getTokenExpiryTime(Token token) throws IOException {
-    TokenIdentifier identifier = token.decodeIdentifier();
-    Preconditions.checkState(identifier instanceof AbstractDelegationTokenIdentifier,
-        "Token %s of type: %s has an identifier which cannot be examined: %s",
-        token, token.getClass(), identifier);
-    AbstractDelegationTokenIdentifier id =
-        (AbstractDelegationTokenIdentifier) identifier;
-    return id.getMaxDate();
-  }
-
-  private static class TokenComparator
-      implements Comparator<Token<? extends TokenIdentifier>>, Serializable {
-    @Override
-    public int compare(Token<? extends TokenIdentifier> left,
-        Token<? extends TokenIdentifier> right) {
-      return left.getKind().toString().compareTo(right.getKind().toString());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/package-info.java
new file mode 100644
index 0000000..766da0d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Yarn Service framework.
+ */
+@InterfaceAudience.Private
+package org.apache.hadoop.yarn.service;
+import org.apache.hadoop.classification.InterfaceAudience;
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
index c0c44c3..d65a196 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.yarn.service.provider;
 
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -28,21 +27,18 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.service.ServiceContext;
-import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.Component;
 import org.apache.hadoop.yarn.service.api.records.ConfigFile;
 import org.apache.hadoop.yarn.service.api.records.ConfigFormat;
-import org.apache.hadoop.yarn.service.api.records.Configuration;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
 import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
-import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
 import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher;
 import org.apache.hadoop.yarn.service.exceptions.BadCommandArgumentsException;
 import org.apache.hadoop.yarn.service.exceptions.SliderException;
 import org.apache.hadoop.yarn.service.utils.PublishedConfiguration;
 import org.apache.hadoop.yarn.service.utils.PublishedConfigurationOutputter;
-import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
 import org.apache.hadoop.yarn.service.utils.ServiceUtils;
+import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -163,53 +159,6 @@ public class ProviderUtils implements YarnServiceConstants {
     }
   }
 
-  /**
-   * Localize the service keytabs for the service.
-   * @param launcher container launcher
-   * @param fileSystem file system
-   * @throws IOException trouble uploading to HDFS
-   */
-  public void localizeServiceKeytabs(AbstractLauncher launcher,
-      SliderFileSystem fileSystem, Service service) throws IOException {
-
-    Configuration conf = service.getConfiguration();
-    String keytabPathOnHost =
-        conf.getProperty(YarnServiceConf.KEY_AM_KEYTAB_LOCAL_PATH);
-    if (ServiceUtils.isUnset(keytabPathOnHost)) {
-      String amKeytabName =
-          conf.getProperty(YarnServiceConf.KEY_AM_LOGIN_KEYTAB_NAME);
-      String keytabDir =
-          conf.getProperty(YarnServiceConf.KEY_HDFS_KEYTAB_DIR);
-      // we need to localize the keytab files in the directory
-      Path keytabDirPath = fileSystem.buildKeytabPath(keytabDir, null,
-          service.getName());
-      boolean serviceKeytabsDeployed = false;
-      if (fileSystem.getFileSystem().exists(keytabDirPath)) {
-        FileStatus[] keytabs = fileSystem.getFileSystem().listStatus(
-            keytabDirPath);
-        LocalResource keytabRes;
-        for (FileStatus keytab : keytabs) {
-          if (!amKeytabName.equals(keytab.getPath().getName())
-              && keytab.getPath().getName().endsWith(".keytab")) {
-            serviceKeytabsDeployed = true;
-            log.info("Localizing keytab {}", keytab.getPath().getName());
-            keytabRes = fileSystem.createAmResource(keytab.getPath(),
-                LocalResourceType.FILE);
-            launcher.addLocalResource(KEYTAB_DIR + "/" +
-                    keytab.getPath().getName(),
-                keytabRes);
-          }
-        }
-      }
-      if (!serviceKeytabsDeployed) {
-        log.warn("No service keytabs for the service have been localized.  "
-            + "If the service requires keytabs for secure operation, "
-            + "please ensure that the required keytabs have been uploaded "
-            + "to the folder {}", keytabDirPath);
-      }
-    }
-  }
-
   public static Path initCompInstanceDir(SliderFileSystem fs,
       ComponentInstance instance) {
     Path compDir = new Path(new Path(fs.getAppDir(), "components"),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
index b58cea8..d5ea45c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.registry.client.api.RegistryConstants;
 import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.security.HadoopKerberosName;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.Artifact;
@@ -40,6 +42,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -86,6 +90,17 @@ public class ServiceApiUtil {
           "No component specified for " + service.getName());
     }
 
+    if (UserGroupInformation.isSecurityEnabled()) {
+      if (!StringUtils.isEmpty(service.getKerberosPrincipal().getKeytab())) {
+        try {
+          // validate URI format
+          new URI(service.getKerberosPrincipal().getKeytab());
+        } catch (URISyntaxException e) {
+          throw new IllegalArgumentException(e);
+        }
+      }
+    }
+
     // Validate there are no component name collisions (collisions are not
     // currently supported) and add any components from external services
     Configuration globalConf = service.getConfiguration();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message