hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lium...@apache.org
Subject [2/2] hadoop git commit: HADOOP-14443. Azure: Support retry and client side failover for authorization, SASKey and delegation token generation. Contributed by Santhosh G Nayak
Date Fri, 30 Jun 2017 23:57:08 GMT
HADOOP-14443. Azure: Support retry and client side failover for authorization, SASKey and delegation token generation. Contributed by Santhosh G Nayak


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/38996fdc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/38996fdc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/38996fdc

Branch: refs/heads/trunk
Commit: 38996fdcf0987d1da00ce46f8284d8fcdce57329
Parents: bcba844
Author: Mingliang Liu <liuml07@apache.org>
Authored: Thu Jun 29 16:13:04 2017 -0700
Committer: Mingliang Liu <liuml07@apache.org>
Committed: Fri Jun 30 16:53:48 2017 -0700

----------------------------------------------------------------------
 .../hadoop/fs/azure/NativeAzureFileSystem.java  |  39 +--
 .../fs/azure/RemoteSASKeyGeneratorImpl.java     | 268 +++++++------------
 .../fs/azure/RemoteWasbAuthorizerImpl.java      | 225 ++++++----------
 .../fs/azure/SecureWasbRemoteCallHelper.java    | 210 +++++++++++++++
 .../hadoop/fs/azure/WasbRemoteCallHelper.java   | 259 +++++++++++++-----
 .../hadoop/fs/azure/security/Constants.java     |  20 +-
 .../hadoop/fs/azure/security/JsonUtils.java     |  52 ++++
 .../RemoteWasbDelegationTokenManager.java       | 162 +++++++++++
 .../hadoop/fs/azure/security/SecurityUtils.java |  86 ------
 .../hadoop/fs/azure/security/TokenUtils.java    |  60 +++++
 .../security/WasbDelegationTokenManager.java    |  54 ++++
 .../fs/azure/security/WasbTokenRenewer.java     |  77 +-----
 .../hadoop-azure/src/site/markdown/index.md     |  44 ++-
 .../TestNativeAzureFileSystemAuthorization.java |   2 +-
 .../fs/azure/TestWasbRemoteCallHelper.java      | 228 +++++++++++++---
 15 files changed, 1170 insertions(+), 616 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/38996fdc/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
index 22f79ff..f999992 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
@@ -27,9 +27,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.net.URL;
 import java.nio.charset.Charset;
-import java.security.PrivilegedExceptionAction;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Date;
@@ -65,15 +63,14 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
 import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
 import org.apache.hadoop.fs.azure.security.Constants;
-import org.apache.hadoop.fs.azure.security.SecurityUtils;
+import org.apache.hadoop.fs.azure.security.RemoteWasbDelegationTokenManager;
+import org.apache.hadoop.fs.azure.security.WasbDelegationTokenManager;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
-import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
-import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
@@ -1177,7 +1174,7 @@ public class NativeAzureFileSystem extends FileSystem {
 
   private UserGroupInformation ugi;
 
-  private String delegationToken = null;
+  private WasbDelegationTokenManager wasbDelegationTokenManager;
 
   public NativeAzureFileSystem() {
     // set store in initialize()
@@ -1327,9 +1324,7 @@ public class NativeAzureFileSystem extends FileSystem {
     }
 
     if (UserGroupInformation.isSecurityEnabled() && kerberosSupportEnabled) {
-      DelegationTokenAuthenticator authenticator = new KerberosDelegationTokenAuthenticator();
-      authURL = new DelegationTokenAuthenticatedURL(authenticator);
-      credServiceUrl = SecurityUtils.getCredServiceUrls(conf);
+      this.wasbDelegationTokenManager = new RemoteWasbDelegationTokenManager(conf);
     }
   }
 
@@ -3002,31 +2997,7 @@ public class NativeAzureFileSystem extends FileSystem {
   @Override
   public synchronized Token<?> getDelegationToken(final String renewer) throws IOException {
     if (kerberosSupportEnabled) {
-      try {
-        final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-        UserGroupInformation connectUgi = ugi.getRealUser();
-        final UserGroupInformation proxyUser = connectUgi;
-        if (connectUgi == null) {
-          connectUgi = ugi;
-        }
-        connectUgi.checkTGTAndReloginFromKeytab();
-        return connectUgi.doAs(new PrivilegedExceptionAction<Token<?>>() {
-          @Override
-          public Token<?> run() throws Exception {
-            return authURL.getDelegationToken(new URL(credServiceUrl
-                    + Constants.DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT),
-                authToken, renewer, (proxyUser != null)? ugi.getShortUserName(): null);
-          }
-        });
-      } catch (Exception ex) {
-        LOG.error("Error in fetching the delegation token from remote service",
-            ex);
-        if (ex instanceof IOException) {
-          throw (IOException) ex;
-        } else {
-          throw new IOException(ex);
-        }
-      }
+      return wasbDelegationTokenManager.getDelegationToken(renewer);
     } else {
       return super.getDelegationToken(renewer);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38996fdc/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteSASKeyGeneratorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteSASKeyGeneratorImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteSASKeyGeneratorImpl.java
index 0e9c700..87f3b0b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteSASKeyGeneratorImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteSASKeyGeneratorImpl.java
@@ -21,20 +21,16 @@ package org.apache.hadoop.fs.azure;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.security.PrivilegedExceptionAction;
+import java.util.List;
 
 import com.fasterxml.jackson.databind.ObjectReader;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.Validate;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.azure.security.Constants;
-import org.apache.hadoop.fs.azure.security.SecurityUtils;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryUtils;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
-import org.apache.hadoop.security.authentication.client.AuthenticationException;
-import org.apache.hadoop.security.authentication.client.Authenticator;
-import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
+
+import org.apache.http.NameValuePair;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.utils.URIBuilder;
 
@@ -56,56 +52,65 @@ public class RemoteSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
 
   public static final Logger LOG =
       LoggerFactory.getLogger(AzureNativeFileSystemStore.class);
-
   private static final ObjectReader RESPONSE_READER = new ObjectMapper()
       .readerFor(RemoteSASKeyGenerationResponse.class);
 
   /**
+   * Configuration parameter name expected in the Configuration
+   * object to provide the url of the remote service {@value}
+   */
+  public static final String KEY_CRED_SERVICE_URLS =
+      "fs.azure.cred.service.urls";
+  /**
+   * Configuration key to enable http retry policy for SAS Key generation. {@value}
+   */
+  public static final String
+      SAS_KEY_GENERATOR_HTTP_CLIENT_RETRY_POLICY_ENABLED_KEY =
+      "fs.azure.saskeygenerator.http.retry.policy.enabled";
+  /**
+   * Configuration key for SAS Key Generation http retry policy spec. {@value}
+   */
+  public static final String
+      SAS_KEY_GENERATOR_HTTP_CLIENT_RETRY_POLICY_SPEC_KEY =
+      "fs.azure.saskeygenerator.http.retry.policy.spec";
+  /**
    * Container SAS Key generation OP name. {@value}
    */
   private static final String CONTAINER_SAS_OP = "GET_CONTAINER_SAS";
-
   /**
    * Relative Blob SAS Key generation OP name. {@value}
    */
   private static final String BLOB_SAS_OP = "GET_RELATIVE_BLOB_SAS";
-
   /**
    * Query parameter specifying the expiry period to be used for sas key
    * {@value}
    */
   private static final String SAS_EXPIRY_QUERY_PARAM_NAME = "sas_expiry";
-
   /**
    * Query parameter name for the storage account. {@value}
    */
   private static final String STORAGE_ACCOUNT_QUERY_PARAM_NAME =
       "storage_account";
-
   /**
    * Query parameter name for the storage account container. {@value}
    */
-  private static final String CONTAINER_QUERY_PARAM_NAME =
-      "container";
-
-  /**
-   * Query parameter name for user info {@value}
-   */
-  private static final String DELEGATION_TOKEN_QUERY_PARAM_NAME =
-      "delegation";
-
+  private static final String CONTAINER_QUERY_PARAM_NAME = "container";
   /**
    * Query parameter name for the relative path inside the storage
    * account container. {@value}
    */
-  private static final String RELATIVE_PATH_QUERY_PARAM_NAME =
-      "relative_path";
+  private static final String RELATIVE_PATH_QUERY_PARAM_NAME = "relative_path";
+  /**
+   * SAS Key Generation Remote http client retry policy spec. {@value}
+   */
+  private static final String
+      SAS_KEY_GENERATOR_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
+      "1000,3,10000,2";
 
-  private String delegationToken;
-  private String credServiceUrl = "";
   private WasbRemoteCallHelper remoteCallHelper = null;
-  private boolean isSecurityEnabled;
   private boolean isKerberosSupportEnabled;
+  private RetryPolicy retryPolicy;
+  private String[] commaSeparatedUrls;
 
   public RemoteSASKeyGeneratorImpl(Configuration conf) {
     super(conf);
@@ -114,180 +119,111 @@ public class RemoteSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
   public void initialize(Configuration conf) throws IOException {
 
     LOG.debug("Initializing RemoteSASKeyGeneratorImpl instance");
-    setDelegationToken();
-    try {
-      credServiceUrl = SecurityUtils.getCredServiceUrls(conf);
-    } catch (UnknownHostException e) {
-      final String msg = "Invalid CredService Url, configure it correctly";
-      LOG.error(msg, e);
-      throw new IOException(msg, e);
-    }
 
-    if (credServiceUrl == null || credServiceUrl.isEmpty()) {
-      final String msg = "CredService Url not found in configuration to "
-          + "initialize RemoteSASKeyGenerator";
-      LOG.error(msg);
-      throw new IOException(msg);
+    this.retryPolicy = RetryUtils.getMultipleLinearRandomRetry(conf,
+        SAS_KEY_GENERATOR_HTTP_CLIENT_RETRY_POLICY_ENABLED_KEY, true,
+        SAS_KEY_GENERATOR_HTTP_CLIENT_RETRY_POLICY_SPEC_KEY,
+        SAS_KEY_GENERATOR_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
+
+    this.isKerberosSupportEnabled =
+        conf.getBoolean(Constants.AZURE_KERBEROS_SUPPORT_PROPERTY_NAME, false);
+    this.commaSeparatedUrls = conf.getTrimmedStrings(KEY_CRED_SERVICE_URLS);
+    if (this.commaSeparatedUrls == null || this.commaSeparatedUrls.length <= 0) {
+      throw new IOException(
+          KEY_CRED_SERVICE_URLS + " config not set" + " in configuration.");
+    }
+    if (isKerberosSupportEnabled && UserGroupInformation.isSecurityEnabled()) {
+      this.remoteCallHelper = new SecureWasbRemoteCallHelper(retryPolicy, false);
+    } else {
+      this.remoteCallHelper = new WasbRemoteCallHelper(retryPolicy);
     }
-
-    remoteCallHelper = new WasbRemoteCallHelper();
-    this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
-    this.isKerberosSupportEnabled = conf.getBoolean(
-        Constants.AZURE_KERBEROS_SUPPORT_PROPERTY_NAME, false);
     LOG.debug("Initialization of RemoteSASKeyGenerator instance successful");
   }
 
   @Override
-  public URI getContainerSASUri(String storageAccount, String container)
-      throws SASKeyGenerationException {
+  public URI getContainerSASUri(String storageAccount,
+      String container) throws SASKeyGenerationException {
+    RemoteSASKeyGenerationResponse sasKeyResponse = null;
     try {
-      LOG.debug("Generating Container SAS Key for Container {} "
-          + "inside Storage Account {} ", container, storageAccount);
-      setDelegationToken();
-      URIBuilder uriBuilder = new URIBuilder(credServiceUrl);
+      URIBuilder uriBuilder = new URIBuilder();
       uriBuilder.setPath("/" + CONTAINER_SAS_OP);
-      uriBuilder.addParameter(STORAGE_ACCOUNT_QUERY_PARAM_NAME,
-          storageAccount);
-      uriBuilder.addParameter(CONTAINER_QUERY_PARAM_NAME,
-          container);
-      uriBuilder.addParameter(SAS_EXPIRY_QUERY_PARAM_NAME, ""
-          + getSasKeyExpiryPeriod());
-      if (isSecurityEnabled && StringUtils.isNotEmpty(delegationToken)) {
-        uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
-            this.delegationToken);
-      }
+      uriBuilder.addParameter(STORAGE_ACCOUNT_QUERY_PARAM_NAME, storageAccount);
+      uriBuilder.addParameter(CONTAINER_QUERY_PARAM_NAME, container);
+      uriBuilder.addParameter(SAS_EXPIRY_QUERY_PARAM_NAME,
+          "" + getSasKeyExpiryPeriod());
 
-      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-      UserGroupInformation connectUgi = ugi.getRealUser();
-      if (connectUgi == null) {
-        connectUgi = ugi;
+      sasKeyResponse = makeRemoteRequest(commaSeparatedUrls, uriBuilder.getPath(),
+              uriBuilder.getQueryParams());
+
+      if (sasKeyResponse.getResponseCode() == REMOTE_CALL_SUCCESS_CODE) {
+        return new URI(sasKeyResponse.getSasKey());
       } else {
-        uriBuilder.addParameter(Constants.DOAS_PARAM, ugi.getShortUserName());
+        throw new SASKeyGenerationException(
+            "Remote Service encountered error in SAS Key generation : "
+                + sasKeyResponse.getResponseMessage());
       }
-      return getSASKey(uriBuilder.build(), connectUgi);
     } catch (URISyntaxException uriSyntaxEx) {
-      throw new SASKeyGenerationException("Encountered URISyntaxException "
-          + "while building the HttpGetRequest to remote cred service",
+      throw new SASKeyGenerationException("Encountered URISyntaxException"
+          + " while building the HttpGetRequest to remote service for ",
           uriSyntaxEx);
-    } catch (IOException e) {
-      throw new SASKeyGenerationException("Encountered IOException"
-          + " while building the HttpGetRequest to remote service", e);
     }
   }
 
   @Override
-  public URI getRelativeBlobSASUri(String storageAccount, String container,
-      String relativePath) throws SASKeyGenerationException {
+  public URI getRelativeBlobSASUri(String storageAccount,
+      String container, String relativePath) throws SASKeyGenerationException {
+
     try {
-      LOG.debug("Generating RelativePath SAS Key for relativePath {} inside"
-              + " Container {} inside Storage Account {} ",
-          relativePath, container, storageAccount);
-      setDelegationToken();
-      URIBuilder uriBuilder = new URIBuilder(credServiceUrl);
+      URIBuilder uriBuilder = new URIBuilder();
       uriBuilder.setPath("/" + BLOB_SAS_OP);
-      uriBuilder.addParameter(STORAGE_ACCOUNT_QUERY_PARAM_NAME,
-          storageAccount);
-      uriBuilder.addParameter(CONTAINER_QUERY_PARAM_NAME,
-          container);
-      uriBuilder.addParameter(RELATIVE_PATH_QUERY_PARAM_NAME,
-          relativePath);
-      uriBuilder.addParameter(SAS_EXPIRY_QUERY_PARAM_NAME, ""
-          + getSasKeyExpiryPeriod());
-
-      if (isSecurityEnabled && StringUtils.isNotEmpty(
-          delegationToken)) {
-        uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
-            this.delegationToken);
-      }
-
-      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-      UserGroupInformation connectUgi = ugi.getRealUser();
-      if (connectUgi == null) {
-        connectUgi = ugi;
+      uriBuilder.addParameter(STORAGE_ACCOUNT_QUERY_PARAM_NAME, storageAccount);
+      uriBuilder.addParameter(CONTAINER_QUERY_PARAM_NAME, container);
+      uriBuilder.addParameter(RELATIVE_PATH_QUERY_PARAM_NAME, relativePath);
+      uriBuilder.addParameter(SAS_EXPIRY_QUERY_PARAM_NAME,
+          "" + getSasKeyExpiryPeriod());
+
+      RemoteSASKeyGenerationResponse sasKeyResponse =
+          makeRemoteRequest(commaSeparatedUrls, uriBuilder.getPath(),
+              uriBuilder.getQueryParams());
+      if (sasKeyResponse.getResponseCode() == REMOTE_CALL_SUCCESS_CODE) {
+        return new URI(sasKeyResponse.getSasKey());
       } else {
-        uriBuilder.addParameter(Constants.DOAS_PARAM, ugi.getShortUserName());
+        throw new SASKeyGenerationException(
+            "Remote Service encountered error in SAS Key generation : "
+                + sasKeyResponse.getResponseMessage());
       }
-      return getSASKey(uriBuilder.build(), connectUgi);
     } catch (URISyntaxException uriSyntaxEx) {
       throw new SASKeyGenerationException("Encountered URISyntaxException"
           + " while building the HttpGetRequest to " + " remote service",
           uriSyntaxEx);
-    } catch (IOException e) {
-      throw new SASKeyGenerationException("Encountered IOException"
-          + " while building the HttpGetRequest to remote service", e);
-    }
-  }
-
-  private URI getSASKey(final URI uri, UserGroupInformation connectUgi)
-      throws URISyntaxException, SASKeyGenerationException {
-    final RemoteSASKeyGenerationResponse sasKeyResponse;
-    try {
-      sasKeyResponse = connectUgi.doAs(
-          new PrivilegedExceptionAction<RemoteSASKeyGenerationResponse>() {
-            @Override
-            public RemoteSASKeyGenerationResponse run() throws Exception {
-              AuthenticatedURL.Token token = null;
-              if (isKerberosSupportEnabled && UserGroupInformation
-                  .isSecurityEnabled() && (delegationToken == null
-                  || delegationToken.isEmpty())) {
-                token = new AuthenticatedURL.Token();
-                final Authenticator kerberosAuthenticator =
-                    new KerberosDelegationTokenAuthenticator();
-                try {
-                  kerberosAuthenticator.authenticate(uri.toURL(), token);
-                  Validate.isTrue(token.isSet(),
-                      "Authenticated Token is NOT present. "
-                          + "The request cannot proceed.");
-                } catch (AuthenticationException e) {
-                  throw new IOException(
-                      "Authentication failed in check authorization", e);
-                }
-              }
-              return makeRemoteRequest(uri,
-                  (token != null ? token.toString() : null));
-            }
-          });
-    } catch (InterruptedException | IOException e) {
-      final String msg = "Error fetching SAS Key from Remote Service: " + uri;
-      LOG.error(msg, e);
-      if (e instanceof InterruptedException) {
-        Thread.currentThread().interrupt();
-      }
-      throw new SASKeyGenerationException(msg, e);
-    }
-
-    if (sasKeyResponse.getResponseCode() == REMOTE_CALL_SUCCESS_CODE) {
-      return new URI(sasKeyResponse.getSasKey());
-    } else {
-      throw new SASKeyGenerationException(
-          "Remote Service encountered error in SAS Key generation : "
-              + sasKeyResponse.getResponseMessage());
     }
   }
 
   /**
    * Helper method to make a remote request.
-   * @param uri - Uri to use for the remote request
-   * @param token - hadoop.auth token for the remote request
+   *
+   * @param urls        - Urls to use for the remote request
+   * @param path        - hadoop.auth token for the remote request
+   * @param queryParams - queryParams to be used.
    * @return RemoteSASKeyGenerationResponse
    */
-  private RemoteSASKeyGenerationResponse makeRemoteRequest(URI uri,
-      String token) throws SASKeyGenerationException {
+  private RemoteSASKeyGenerationResponse makeRemoteRequest(String[] urls,
+      String path, List<NameValuePair> queryParams)
+      throws SASKeyGenerationException {
 
     try {
-      HttpGet httpGet = new HttpGet(uri);
-      if (token != null) {
-        httpGet.setHeader("Cookie", AuthenticatedURL.AUTH_COOKIE + "=" + token);
-      }
-      String responseBody = remoteCallHelper.makeRemoteGetRequest(httpGet);
+      String responseBody = remoteCallHelper
+          .makeRemoteRequest(urls, path, queryParams, HttpGet.METHOD_NAME);
       return RESPONSE_READER.readValue(responseBody);
+
     } catch (WasbRemoteCallException remoteCallEx) {
       throw new SASKeyGenerationException("Encountered RemoteCallException"
           + " while retrieving SAS key from remote service", remoteCallEx);
     } catch (JsonParseException jsonParserEx) {
       throw new SASKeyGenerationException("Encountered JsonParseException "
           + "while parsing the response from remote"
-          + " service into RemoteSASKeyGenerationResponse object", jsonParserEx);
+          + " service into RemoteSASKeyGenerationResponse object",
+          jsonParserEx);
     } catch (JsonMappingException jsonMappingEx) {
       throw new SASKeyGenerationException("Encountered JsonMappingException"
           + " while mapping the response from remote service into "
@@ -297,10 +233,6 @@ public class RemoteSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
           + "accessing remote service to retrieve SAS Key", ioEx);
     }
   }
-
-  private void setDelegationToken() throws IOException {
-    this.delegationToken = SecurityUtils.getDelegationTokenFromCredentials();
-  }
 }
 
 /**
@@ -309,9 +241,9 @@ public class RemoteSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
  * The remote SAS Key generation service is expected to
  * return SAS key in json format:
  * {
- *    "responseCode" : 0 or non-zero <int>,
- *    "responseMessage" : relavant message on failure <String>,
- *    "sasKey" : Requested SAS Key <String>
+ *   "responseCode" : 0 or non-zero <int>,
+ *   "responseMessage" : relavant message on failure <String>,
+ *   "sasKey" : Requested SAS Key <String>
  * }
  */
 class RemoteSASKeyGenerationResponse {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38996fdc/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteWasbAuthorizerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteWasbAuthorizerImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteWasbAuthorizerImpl.java
index b1e671d..e2d515c 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteWasbAuthorizerImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteWasbAuthorizerImpl.java
@@ -24,23 +24,17 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectReader;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.Validate;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.azure.security.Constants;
-import org.apache.hadoop.fs.azure.security.SecurityUtils;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryUtils;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
-import org.apache.hadoop.security.authentication.client.AuthenticationException;
-import org.apache.hadoop.security.authentication.client.Authenticator;
-import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.utils.URIBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.net.URISyntaxException;
-import java.security.PrivilegedExceptionAction;
 
 import static org.apache.hadoop.fs.azure.WasbRemoteCallHelper.REMOTE_CALL_SUCCESS_CODE;
 
@@ -55,54 +49,59 @@ public class RemoteWasbAuthorizerImpl implements WasbAuthorizerInterface {
   public static final Logger LOG = LoggerFactory
       .getLogger(RemoteWasbAuthorizerImpl.class);
   private static final ObjectReader RESPONSE_READER = new ObjectMapper()
-      .readerFor(RemoteAuthorizerResponse.class);
-
-  private String remoteAuthorizerServiceUrl = null;
+      .readerFor(RemoteWasbAuthorizerResponse.class);
 
   /**
    * Configuration parameter name expected in the Configuration object to
-   * provide the url of the remote service. {@value}
+   * provide the urls of the remote service instances. {@value}
    */
-  public static final String KEY_REMOTE_AUTH_SERVICE_URL =
-      "fs.azure.authorization.remote.service.url";
-
+  public static final String KEY_REMOTE_AUTH_SERVICE_URLS =
+      "fs.azure.authorization.remote.service.urls";
   /**
    * Authorization operation OP name in the remote service {@value}
    */
-  private static final String CHECK_AUTHORIZATION_OP =
-      "CHECK_AUTHORIZATION";
-
+  private static final String CHECK_AUTHORIZATION_OP = "CHECK_AUTHORIZATION";
   /**
    * Query parameter specifying the access operation type. {@value}
    */
   private static final String ACCESS_OPERATION_QUERY_PARAM_NAME =
       "operation_type";
-
   /**
    * Query parameter specifying the wasb absolute path. {@value}
    */
   private static final String WASB_ABSOLUTE_PATH_QUERY_PARAM_NAME =
       "wasb_absolute_path";
+  /**
+   *  Query parameter name for sending owner of the specific resource {@value}
+   */
+  private static final String WASB_RESOURCE_OWNER_QUERY_PARAM_NAME =
+      "wasb_resource_owner";
 
   /**
-   * Query parameter name for user info {@value}
+   * Authorization Remote http client retry policy enabled configuration key. {@value}
    */
-  private static final String DELEGATION_TOKEN_QUERY_PARAM_NAME =
-      "delegation";
+  private static final String AUTHORIZER_HTTP_CLIENT_RETRY_POLICY_ENABLED_KEY =
+      "fs.azure.authorizer.http.retry.policy.enabled";
 
   /**
-   *  Query parameter name for sending owner of the specific resource {@value}
+   * Authorization Remote http client retry policy spec. {@value}
    */
-  private static final String WASB_RESOURCE_OWNER_QUERY_PARAM_NAME =
-      "wasb_resource_owner";
+  private static final String AUTHORIZER_HTTP_CLIENT_RETRY_POLICY_SPEC_SPEC =
+      "fs.azure.authorizer.http.retry.policy.spec";
+
+  /**
+   * Authorization Remote http client retry policy spec default value. {@value}
+   */
+  private static final String AUTHORIZER_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
+      "1000,3,10000,2";
 
   private WasbRemoteCallHelper remoteCallHelper = null;
-  private String delegationToken;
-  private boolean isSecurityEnabled;
   private boolean isKerberosSupportEnabled;
+  private RetryPolicy retryPolicy;
+  private String[] commaSeparatedUrls = null;
 
-  @VisibleForTesting
-  public void updateWasbRemoteCallHelper(WasbRemoteCallHelper helper) {
+  @VisibleForTesting public void updateWasbRemoteCallHelper(
+      WasbRemoteCallHelper helper) {
     this.remoteCallHelper = helper;
   }
 
@@ -110,114 +109,63 @@ public class RemoteWasbAuthorizerImpl implements WasbAuthorizerInterface {
   public void init(Configuration conf)
       throws WasbAuthorizationException, IOException {
     LOG.debug("Initializing RemoteWasbAuthorizerImpl instance");
-    setDelegationToken();
-    remoteAuthorizerServiceUrl = SecurityUtils
-        .getRemoteAuthServiceUrls(conf);
-
-    if (remoteAuthorizerServiceUrl == null
-          || remoteAuthorizerServiceUrl.isEmpty()) {
-      throw new WasbAuthorizationException(
-          "fs.azure.authorization.remote.service.url config not set"
-              + " in configuration.");
+    this.isKerberosSupportEnabled =
+        conf.getBoolean(Constants.AZURE_KERBEROS_SUPPORT_PROPERTY_NAME, false);
+    this.commaSeparatedUrls =
+        conf.getTrimmedStrings(KEY_REMOTE_AUTH_SERVICE_URLS);
+    if (this.commaSeparatedUrls == null
+        || this.commaSeparatedUrls.length <= 0) {
+      throw new IOException(KEY_REMOTE_AUTH_SERVICE_URLS + " config not set"
+          + " in configuration.");
+    }
+    this.retryPolicy = RetryUtils.getMultipleLinearRandomRetry(conf,
+        AUTHORIZER_HTTP_CLIENT_RETRY_POLICY_ENABLED_KEY, true,
+        AUTHORIZER_HTTP_CLIENT_RETRY_POLICY_SPEC_SPEC,
+        AUTHORIZER_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
+    if (isKerberosSupportEnabled && UserGroupInformation.isSecurityEnabled()) {
+      this.remoteCallHelper = new SecureWasbRemoteCallHelper(retryPolicy, false);
+    } else {
+      this.remoteCallHelper = new WasbRemoteCallHelper(retryPolicy);
     }
-
-    this.remoteCallHelper = new WasbRemoteCallHelper();
-    this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
-    this.isKerberosSupportEnabled = conf
-        .getBoolean(Constants.AZURE_KERBEROS_SUPPORT_PROPERTY_NAME, false);
   }
 
   @Override
   public boolean authorize(String wasbAbsolutePath, String accessType, String resourceOwner)
       throws WasbAuthorizationException, IOException {
 
-      try {
-
+    try {
         /* Make an exception for the internal -RenamePending files */
-        if (wasbAbsolutePath.endsWith(NativeAzureFileSystem.FolderRenamePending.SUFFIX)) {
-          return true;
-        }
-
-        setDelegationToken();
-        URIBuilder uriBuilder = new URIBuilder(remoteAuthorizerServiceUrl);
-        uriBuilder.setPath("/" + CHECK_AUTHORIZATION_OP);
-        uriBuilder.addParameter(WASB_ABSOLUTE_PATH_QUERY_PARAM_NAME,
-            wasbAbsolutePath);
-        uriBuilder.addParameter(ACCESS_OPERATION_QUERY_PARAM_NAME,
-            accessType);
-        if (isSecurityEnabled && StringUtils.isNotEmpty(delegationToken)) {
-          uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
-              delegationToken);
-        }
-        if (resourceOwner != null && StringUtils.isNotEmpty(resourceOwner)) {
-          uriBuilder.addParameter(WASB_RESOURCE_OWNER_QUERY_PARAM_NAME,
-              resourceOwner);
-        }
-
-        String responseBody = null;
-        UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-        UserGroupInformation connectUgi = ugi.getRealUser();
-        if (connectUgi == null) {
-          connectUgi = ugi;
-        } else {
-          uriBuilder.addParameter(Constants.DOAS_PARAM, ugi.getShortUserName());
-        }
-
-        try {
-          responseBody = connectUgi
-              .doAs(new PrivilegedExceptionAction<String>() {
-                @Override
-                public String run() throws Exception {
-                  AuthenticatedURL.Token token = null;
-                  HttpGet httpGet = new HttpGet(uriBuilder.build());
-                  if (isKerberosSupportEnabled && UserGroupInformation
-                      .isSecurityEnabled() && (delegationToken == null
-                      || delegationToken.isEmpty())) {
-                    token = new AuthenticatedURL.Token();
-                    final Authenticator kerberosAuthenticator = new KerberosDelegationTokenAuthenticator();
-                    try {
-                      kerberosAuthenticator
-                          .authenticate(uriBuilder.build().toURL(), token);
-                      Validate.isTrue(token.isSet(),
-                          "Authenticated Token is NOT present. The request cannot proceed.");
-                    } catch (AuthenticationException e){
-                      throw new IOException("Authentication failed in check authorization", e);
-                    }
-                    if (token != null) {
-                      httpGet.setHeader("Cookie",
-                          AuthenticatedURL.AUTH_COOKIE + "=" + token);
-                    }
-                  }
-                  return remoteCallHelper.makeRemoteGetRequest(httpGet);
-                }
-              });
-        } catch (InterruptedException e) {
-          LOG.error("Error in check authorization", e);
-          throw new WasbAuthorizationException("Error in check authorize", e);
-        }
-
-        RemoteAuthorizerResponse authorizerResponse =
-            RESPONSE_READER.readValue(responseBody);
-
-        if (authorizerResponse == null) {
-          throw new WasbAuthorizationException(
-              "RemoteAuthorizerResponse object null from remote call");
-        } else if (authorizerResponse.getResponseCode()
-            == REMOTE_CALL_SUCCESS_CODE) {
-          return authorizerResponse.getAuthorizationResult();
-        } else {
-          throw new WasbAuthorizationException("Remote authorization"
-              + " service encountered an error "
-              + authorizerResponse.getResponseMessage());
-        }
-      } catch (URISyntaxException | WasbRemoteCallException
-          | JsonParseException | JsonMappingException ex) {
-        throw new WasbAuthorizationException(ex);
+      final URIBuilder uriBuilder = new URIBuilder();
+      uriBuilder.setPath("/" + CHECK_AUTHORIZATION_OP);
+      uriBuilder
+          .addParameter(WASB_ABSOLUTE_PATH_QUERY_PARAM_NAME, wasbAbsolutePath);
+      uriBuilder.addParameter(ACCESS_OPERATION_QUERY_PARAM_NAME, accessType);
+      if (resourceOwner != null && StringUtils.isNotEmpty(resourceOwner)) {
+        uriBuilder.addParameter(WASB_RESOURCE_OWNER_QUERY_PARAM_NAME,
+            resourceOwner);
       }
-  }
 
-  private void setDelegationToken() throws IOException {
-    this.delegationToken = SecurityUtils.getDelegationTokenFromCredentials();
+      String responseBody = remoteCallHelper
+          .makeRemoteRequest(commaSeparatedUrls, uriBuilder.getPath(),
+              uriBuilder.getQueryParams(), HttpGet.METHOD_NAME);
+
+      RemoteWasbAuthorizerResponse authorizerResponse = RESPONSE_READER
+          .readValue(responseBody);
+
+      if (authorizerResponse == null) {
+        throw new WasbAuthorizationException(
+            "RemoteWasbAuthorizerResponse object null from remote call");
+      } else if (authorizerResponse.getResponseCode()
+          == REMOTE_CALL_SUCCESS_CODE) {
+        return authorizerResponse.getAuthorizationResult();
+      } else {
+        throw new WasbAuthorizationException(
+            "Remote authorization" + " service encountered an error "
+                + authorizerResponse.getResponseMessage());
+      }
+    } catch (WasbRemoteCallException | JsonParseException | JsonMappingException ex) {
+      throw new WasbAuthorizationException(ex);
+    }
   }
 }
 
@@ -227,30 +175,19 @@ public class RemoteWasbAuthorizerImpl implements WasbAuthorizerInterface {
  * The remote service is expected to return the authorization
  * response in the following JSON format
  * {
- *    "responseCode" : 0 or non-zero <int>,
- *    "responseMessage" : relevant message of failure <String>
- *    "authorizationResult" : authorization result <boolean>
- *                            true - if auhorization allowed
- *                            false - otherwise.
- *
+ *   "responseCode" : 0 or non-zero <int>,
+ *   "responseMessage" : relevant message of failure <String>
+ *   "authorizationResult" : authorization result <boolean>
+ *   true - if auhorization allowed
+ *   false - otherwise.
  * }
  */
-class RemoteAuthorizerResponse {
+class RemoteWasbAuthorizerResponse {
 
   private int responseCode;
   private boolean authorizationResult;
   private String responseMessage;
 
-  public RemoteAuthorizerResponse(int responseCode,
-      boolean authorizationResult, String message) {
-    this.responseCode = responseCode;
-    this.authorizationResult = authorizationResult;
-    this.responseMessage = message;
-  }
-
-  public RemoteAuthorizerResponse() {
-  }
-
   public int getResponseCode() {
     return responseCode;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38996fdc/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureWasbRemoteCallHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureWasbRemoteCallHelper.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureWasbRemoteCallHelper.java
new file mode 100644
index 0000000..7f8bc0e
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureWasbRemoteCallHelper.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.apache.commons.lang.Validate;
+import org.apache.hadoop.fs.azure.security.Constants;
+import org.apache.hadoop.fs.azure.security.WasbDelegationTokenIdentifier;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.security.authentication.client.Authenticator;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.utils.URIBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.PrivilegedExceptionAction;
+import java.util.List;
+
+/**
+ * Helper class the has constants and helper methods
+ * used in WASB when integrating with a remote http cred
+ * service which uses Kerberos and delegation tokens.
+ * Currently, remote service will be used to generate
+ * SAS keys, authorization and delegation token operations.
+ */
+public class SecureWasbRemoteCallHelper extends WasbRemoteCallHelper {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SecureWasbRemoteCallHelper.class);
+  /**
+   * Delegation token query parameter to be used when making rest call.
+   */
+  private static final String DELEGATION_TOKEN_QUERY_PARAM_NAME = "delegation";
+
+  /**
+   * Delegation token to be used for making the remote call.
+   */
+  private Token<?> delegationToken = null;
+
+  /**
+   * Does Remote Http Call requires Kerberos Authentication always, even if the delegation token is present.
+   */
+  private boolean alwaysRequiresKerberosAuth;
+
+  public SecureWasbRemoteCallHelper(RetryPolicy retryPolicy,
+      boolean alwaysRequiresKerberosAuth) {
+    super(retryPolicy);
+    this.alwaysRequiresKerberosAuth = alwaysRequiresKerberosAuth;
+  }
+
+  @Override
+  public String makeRemoteRequest(final String[] urls,
+      final String path, final List<NameValuePair> queryParams,
+      final String httpMethod) throws IOException {
+    final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    UserGroupInformation connectUgi = ugi.getRealUser();
+    if (connectUgi == null) {
+      connectUgi = ugi;
+    }
+    if (delegationToken == null) {
+      connectUgi.checkTGTAndReloginFromKeytab();
+    }
+    String s = null;
+    try {
+      s = connectUgi.doAs(new PrivilegedExceptionAction<String>() {
+        @Override public String run() throws Exception {
+          return retryableRequest(urls, path, queryParams, httpMethod);
+        }
+      });
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IOException(e.getMessage(), e);
+    }
+    return s;
+  }
+
+  @Override
+  public HttpUriRequest getHttpRequest(String[] urls, String path,
+      List<NameValuePair> queryParams, int urlIndex, String httpMethod)
+      throws URISyntaxException, IOException {
+    final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    UserGroupInformation connectUgi = ugi.getRealUser();
+    if (connectUgi != null) {
+      queryParams.add(new NameValuePair() {
+        @Override public String getName() {
+          return Constants.DOAS_PARAM;
+        }
+
+        @Override public String getValue() {
+          return ugi.getShortUserName();
+        }
+      });
+    }
+
+    final Token delegationToken = getDelegationToken(ugi);
+    if (!alwaysRequiresKerberosAuth && delegationToken != null) {
+      final String delegationTokenEncodedUrlString =
+          delegationToken.encodeToUrlString();
+      queryParams.add(new NameValuePair() {
+        @Override public String getName() {
+          return DELEGATION_TOKEN_QUERY_PARAM_NAME;
+        }
+
+        @Override public String getValue() {
+          return delegationTokenEncodedUrlString;
+        }
+      });
+    }
+
+    URIBuilder uriBuilder =
+        new URIBuilder(urls[urlIndex]).setPath(path).setParameters(queryParams);
+    HttpUriRequest httpUriRequest = null;
+    switch (httpMethod) {
+    case HttpPut.METHOD_NAME:
+      httpUriRequest = new HttpPut(uriBuilder.build());
+      break;
+    case HttpPost.METHOD_NAME:
+      httpUriRequest = new HttpPost(uriBuilder.build());
+      break;
+    default:
+      httpUriRequest = new HttpGet(uriBuilder.build());
+      break;
+    }
+
+    LOG.debug("SecureWasbRemoteCallHelper#getHttpRequest() {}",
+        uriBuilder.build().toURL());
+    if (alwaysRequiresKerberosAuth || delegationToken == null) {
+      AuthenticatedURL.Token token = new AuthenticatedURL.Token();
+      final Authenticator kerberosAuthenticator =
+          new KerberosDelegationTokenAuthenticator();
+      try {
+        kerberosAuthenticator.authenticate(uriBuilder.build().toURL(), token);
+      } catch (AuthenticationException e) {
+        throw new WasbRemoteCallException(
+            Constants.AUTHENTICATION_FAILED_ERROR_MESSAGE, e);
+      }
+      Validate.isTrue(token.isSet(),
+          "Authenticated Token is NOT present. The request cannot proceed.");
+
+      httpUriRequest.setHeader("Cookie",
+          AuthenticatedURL.AUTH_COOKIE + "=" + token);
+    }
+    return httpUriRequest;
+  }
+
+  private synchronized Token<?> getDelegationToken(
+      UserGroupInformation userGroupInformation) throws IOException {
+    if (this.delegationToken == null) {
+      Token<?> token = null;
+      for (Token iterToken : userGroupInformation.getTokens()) {
+        if (iterToken.getKind()
+            .equals(WasbDelegationTokenIdentifier.TOKEN_KIND)) {
+          token = iterToken;
+          LOG.debug("{} token found in cache : {}",
+              WasbDelegationTokenIdentifier.TOKEN_KIND, iterToken);
+          break;
+        }
+      }
+      LOG.debug("UGI Information: {}", userGroupInformation.toString());
+
+      // ugi tokens are usually indicative of a task which can't
+      // refetch tokens.  even if ugi has credentials, don't attempt
+      // to get another token to match hdfs/rpc behavior
+      if (token != null) {
+        LOG.debug("Using UGI token: {}", token);
+        setDelegationToken(token);
+      }
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Delegation token from cache - {}", delegationToken != null
+          ? delegationToken.encodeToUrlString()
+          : "null");
+    }
+    return this.delegationToken;
+  }
+
+  private <T extends TokenIdentifier> void setDelegationToken(
+      final Token<T> token) {
+    synchronized (this) {
+      this.delegationToken = token;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38996fdc/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbRemoteCallHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbRemoteCallHelper.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbRemoteCallHelper.java
index b43e5ae..7c26e8a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbRemoteCallHelper.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbRemoteCallHelper.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,19 +19,31 @@
 package org.apache.hadoop.fs.azure;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.azure.security.Constants;
+import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.http.Header;
 import org.apache.http.HttpResponse;
 import org.apache.http.HttpStatus;
+import org.apache.http.NameValuePair;
 import org.apache.http.StatusLine;
-import org.apache.http.client.ClientProtocolException;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.utils.URIBuilder;
 import org.apache.http.impl.client.HttpClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.InterruptedIOException;
+import java.net.URISyntaxException;
 import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Random;
 
 /**
  * Helper class the has constants and helper methods
@@ -39,101 +51,212 @@ import java.nio.charset.StandardCharsets;
  * service. Currently, remote service will be used to generate
  * SAS keys.
  */
-class WasbRemoteCallHelper {
+public class WasbRemoteCallHelper {
 
+  public static final Logger LOG =
+      LoggerFactory.getLogger(WasbRemoteCallHelper.class);
   /**
    * Return code when the remote call is successful. {@value}
    */
   public static final int REMOTE_CALL_SUCCESS_CODE = 0;
 
   /**
+   * Application Json content type.
+   */
+  private static final String APPLICATION_JSON = "application/json";
+
+  /**
+   * Max content length of the response.
+   */
+  private static final int MAX_CONTENT_LENGTH = 1024;
+
+  /**
    * Client instance to be used for making the remote call.
    */
   private HttpClient client = null;
 
+  private Random random = new Random();
+
+  private RetryPolicy retryPolicy = null;
+
+  public WasbRemoteCallHelper(RetryPolicy retryPolicy) {
+    this.client = HttpClientBuilder.create().build();
+    this.retryPolicy = retryPolicy;
+  }
+
   @VisibleForTesting
   public void updateHttpClient(HttpClient client) {
     this.client = client;
   }
 
-  public WasbRemoteCallHelper() {
-    this.client = HttpClientBuilder.create().build();
-  }
-
   /**
    * Helper method to make remote HTTP Get request.
-   * @param getRequest - HttpGet request object constructed by caller.
+   *
+   * @param urls        - Service urls to be used, if one fails try another.
+   * @param path        - URL endpoint for the resource.
+   * @param queryParams - list of query parameters
+   * @param httpMethod  - http Method to be used.
    * @return Http Response body returned as a string. The caller
-   *  is expected to semantically understand the response.
-   * @throws WasbRemoteCallException
-   * @throws IOException
+   * is expected to semantically understand the response.
+   * @throws IOException when there an error in executing the remote http request.
    */
-  public String makeRemoteGetRequest(HttpGet getRequest)
-      throws WasbRemoteCallException, IOException {
+  public String makeRemoteRequest(String[] urls, String path,
+      List<NameValuePair> queryParams, String httpMethod) throws IOException {
 
-    try {
+    return retryableRequest(urls, path, queryParams, httpMethod);
+  }
 
-      final String APPLICATION_JSON = "application/json";
-      final int MAX_CONTENT_LENGTH = 1024;
+  protected String retryableRequest(String[] urls, String path,
+      List<NameValuePair> queryParams, String httpMethod) throws IOException {
+    HttpResponse response = null;
+    HttpUriRequest httpRequest = null;
 
-      getRequest.setHeader("Accept", APPLICATION_JSON);
+    for (int retry = 0, index =
+         random.nextInt(urls.length);; retry++, index++) {
+      if (index >= urls.length) {
+        index = index % urls.length;
+      }
 
-      HttpResponse response = client.execute(getRequest);
+      try {
+        httpRequest =
+            getHttpRequest(urls, path, queryParams, index, httpMethod);
 
-      StatusLine statusLine = response.getStatusLine();
-      if (statusLine == null || statusLine.getStatusCode() != HttpStatus.SC_OK) {
-        throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" +
-            ((statusLine!=null) ? statusLine.toString() : "NULL")
-        );
-      }
+        httpRequest.setHeader("Accept", APPLICATION_JSON);
+        response = client.execute(httpRequest);
+        StatusLine statusLine = response.getStatusLine();
+        if (statusLine == null
+            || statusLine.getStatusCode() != HttpStatus.SC_OK) {
+          throw new WasbRemoteCallException(
+              httpRequest.getURI().toString() + ":" + ((statusLine != null)
+                                                       ? statusLine.toString()
+                                                       : "NULL"));
+        }
 
-      Header contentTypeHeader = response.getFirstHeader("Content-Type");
-      if (contentTypeHeader == null
-          || !APPLICATION_JSON.equals(contentTypeHeader.getValue())) {
-        throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" +
-            "Content-Type mismatch: expected: " + APPLICATION_JSON +
-            ", got " + ((contentTypeHeader!=null) ? contentTypeHeader.getValue() : "NULL")
-        );
-      }
+        Header contentTypeHeader = response.getFirstHeader("Content-Type");
+        if (contentTypeHeader == null || !APPLICATION_JSON
+            .equals(contentTypeHeader.getValue())) {
+          throw new WasbRemoteCallException(
+              httpRequest.getURI().toString() + ":"
+                  + "Content-Type mismatch: expected: " + APPLICATION_JSON
+                  + ", got " + ((contentTypeHeader != null) ? contentTypeHeader
+                  .getValue() : "NULL"));
+        }
 
-      Header contentLengthHeader = response.getFirstHeader("Content-Length");
-      if (contentLengthHeader == null) {
-        throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" +
-            "Content-Length header missing"
-        );
-      }
+        Header contentLengthHeader = response.getFirstHeader("Content-Length");
+        if (contentLengthHeader == null) {
+          throw new WasbRemoteCallException(
+              httpRequest.getURI().toString() + ":"
+                  + "Content-Length header missing");
+        }
 
-      try {
-        if (Integer.parseInt(contentLengthHeader.getValue()) > MAX_CONTENT_LENGTH) {
-          throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" +
-              "Content-Length:" + contentLengthHeader.getValue() +
-              "exceeded max:" + MAX_CONTENT_LENGTH
-          );
+        try {
+          if (Integer.parseInt(contentLengthHeader.getValue())
+              > MAX_CONTENT_LENGTH) {
+            throw new WasbRemoteCallException(
+                httpRequest.getURI().toString() + ":" + "Content-Length:"
+                    + contentLengthHeader.getValue() + "exceeded max:"
+                    + MAX_CONTENT_LENGTH);
+          }
+        } catch (NumberFormatException nfe) {
+          throw new WasbRemoteCallException(
+              httpRequest.getURI().toString() + ":"
+                  + "Invalid Content-Length value :" + contentLengthHeader
+                  .getValue());
+        }
+
+        BufferedReader rd = null;
+        StringBuilder responseBody = new StringBuilder();
+        try {
+          rd = new BufferedReader(
+              new InputStreamReader(response.getEntity().getContent(),
+                  StandardCharsets.UTF_8));
+          String responseLine = "";
+          while ((responseLine = rd.readLine()) != null) {
+            responseBody.append(responseLine);
+          }
+        } finally {
+          rd.close();
+        }
+        return responseBody.toString();
+      } catch (URISyntaxException uriSyntaxEx) {
+        throw new WasbRemoteCallException("Encountered URISyntaxException "
+            + "while building the HttpGetRequest to remote service",
+            uriSyntaxEx);
+      } catch (IOException e) {
+        LOG.debug(e.getMessage(), e);
+        try {
+          shouldRetry(e, retry, (httpRequest != null)
+                                ? httpRequest.getURI().toString()
+                                : urls[index]);
+        } catch (IOException ioex) {
+          String message =
+              "Encountered error while making remote call to " + String
+                  .join(",", urls) + " retried " + retry + " time(s).";
+          LOG.error(message, ioex);
+          throw new WasbRemoteCallException(message, ioex);
         }
       }
-      catch (NumberFormatException nfe) {
-        throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" +
-            "Invalid Content-Length value :" + contentLengthHeader.getValue()
-        );
-      }
+    }
+  }
+
+  protected HttpUriRequest getHttpRequest(String[] urls, String path,
+      List<NameValuePair> queryParams, int urlIndex, String httpMethod)
+      throws URISyntaxException, IOException {
+    URIBuilder uriBuilder = null;
+    uriBuilder =
+        new URIBuilder(urls[urlIndex]).setPath(path).setParameters(queryParams);
+    HttpUriRequest httpUriRequest = null;
+    switch (httpMethod) {
+    case HttpPut.METHOD_NAME:
+      httpUriRequest = new HttpPut(uriBuilder.build());
+      break;
+    case HttpPost.METHOD_NAME:
+      httpUriRequest = new HttpPost(uriBuilder.build());
+      break;
+    default:
+      httpUriRequest = new HttpGet(uriBuilder.build());
+      break;
+    }
+    return httpUriRequest;
+  }
+
+  private void shouldRetry(final IOException ioe, final int retry,
+      final String url) throws IOException {
+    CharSequence authenticationExceptionMessage =
+        Constants.AUTHENTICATION_FAILED_ERROR_MESSAGE;
+    if (ioe instanceof WasbRemoteCallException && ioe.getMessage()
+        .equals(authenticationExceptionMessage)) {
+      throw ioe;
+    }
+    try {
+      final RetryPolicy.RetryAction a = (retryPolicy != null)
+                                        ? retryPolicy
+                                            .shouldRetry(ioe, retry, 0, true)
+                                        : RetryPolicy.RetryAction.FAIL;
+
+      boolean isRetry = a.action == RetryPolicy.RetryAction.RetryDecision.RETRY;
+      boolean isFailoverAndRetry =
+          a.action == RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY;
+
+      if (isRetry || isFailoverAndRetry) {
+        LOG.debug("Retrying connect to Remote service:{}. Already tried {}"
+                + " time(s); retry policy is {}, " + "delay {}ms.", url, retry,
+            retryPolicy, a.delayMillis);
 
-      BufferedReader rd = new BufferedReader(
-          new InputStreamReader(response.getEntity().getContent(),
-              StandardCharsets.UTF_8));
-      StringBuilder responseBody = new StringBuilder();
-      String responseLine = "";
-      while ((responseLine = rd.readLine()) != null) {
-        responseBody.append(responseLine);
+        Thread.sleep(a.delayMillis);
+        return;
       }
-      rd.close();
-      return responseBody.toString();
-
-    } catch (ClientProtocolException clientProtocolEx) {
-      throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" +
-          "Encountered ClientProtocolException while making remote call", clientProtocolEx);
-    } catch (IOException ioEx) {
-      throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" +
-          "Encountered IOException while making remote call", ioEx);
+    } catch(InterruptedIOException e) {
+      LOG.warn(e.getMessage(), e);
+      Thread.currentThread().interrupt();
+      return;
+    } catch (Exception e) {
+      LOG.warn("Original exception is ", ioe);
+      throw new WasbRemoteCallException(e.getMessage(), e);
     }
+    LOG.debug("Not retrying anymore, already retried the urls {} time(s)",
+        retry);
+    throw new WasbRemoteCallException(
+        url + ":" + "Encountered IOException while making remote call", ioe);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38996fdc/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/Constants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/Constants.java
index 79531a9..cacdfc5 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/Constants.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/Constants.java
@@ -27,21 +27,6 @@ public final class Constants {
   }
 
   /**
-   * Configuration parameter name expected in the Configuration
-   * object to provide the url of the remote service {@value}
-   */
-  public static final String KEY_CRED_SERVICE_URL = "fs.azure.cred.service.url";
-  /**
-   * Default port of the remote service used as delegation token manager and Azure storage SAS key generator.
-   */
-  public static final int DEFAULT_CRED_SERVICE_PORT = 50911;
-
-  /**
-   * Default remote delegation token manager endpoint.
-   */
-  public static final String DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT = "/tokenmanager/v1";
-
-  /**
    * The configuration property to enable Kerberos support.
    */
 
@@ -51,4 +36,9 @@ public final class Constants {
    * Parameter to be used for impersonation.
    */
   public static final String DOAS_PARAM = "doas";
+
+  /**
+   * Error message for Authentication failures.
+   */
+  public static final String AUTHENTICATION_FAILED_ERROR_MESSAGE = "Authentication Failed ";
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38996fdc/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/JsonUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/JsonUtils.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/JsonUtils.java
new file mode 100644
index 0000000..20dd470
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/JsonUtils.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure.security;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Locale;
+import java.util.Map;
+
+/**
+ * Utility class to parse JSON.
+ */
+public final class JsonUtils {
+  public static final Logger LOG = LoggerFactory.getLogger(JsonUtils.class);
+
+  private JsonUtils() {
+  }
+
+  public static Map<?, ?> parse(final String jsonString) throws IOException {
+    try {
+      ObjectMapper mapper = new ObjectMapper();
+      return mapper.readerFor(Map.class).readValue(jsonString);
+    } catch (Exception e) {
+      LOG.debug("JSON Parsing exception: {} while parsing {}", e.getMessage(),
+          jsonString);
+      if (jsonString.toLowerCase(Locale.ENGLISH).contains("server error")) {
+        LOG.error(
+            "Internal Server Error was encountered while making a request");
+      }
+      throw new IOException("JSON Parsing Error: " + e.getMessage(), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38996fdc/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/RemoteWasbDelegationTokenManager.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/RemoteWasbDelegationTokenManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/RemoteWasbDelegationTokenManager.java
new file mode 100644
index 0000000..1078f88
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/RemoteWasbDelegationTokenManager.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure.security;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azure.SecureWasbRemoteCallHelper;
+import org.apache.hadoop.fs.azure.WasbRemoteCallHelper;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.utils.URIBuilder;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ *  Class to manage delegation token operations by making rest call to remote service.
+ */
+public class RemoteWasbDelegationTokenManager
+    implements WasbDelegationTokenManager {
+
+  /**
+   * Configuration parameter name expected in the configuration
+   * object to provide the url of the delegation token service to fetch the delegation tokens.
+   */
+  public static final String KEY_DELEGATION_TOKEN_SERVICE_URLS =
+      "fs.azure.delegation.token.service.urls";
+  /**
+   * Configuration key to enable http retry policy for delegation token service calls.
+   */
+  public static final String DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_ENABLED_KEY =
+      "fs.azure.delegationtokenservice.http.retry.policy.enabled";
+  /**
+   * Configuration key for delegation token service http retry policy spec.
+   */
+  public static final String DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_SPEC_KEY =
+      "fs.azure.delegationtokenservice.http.retry.policy.spec";
+  /**
+   * Default remote delegation token manager endpoint.
+   */
+  private static final String DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT =
+      "/tokenmanager/v1";
+  /**
+   * Default for delegation token service http retry policy spec.
+   */
+  private static final String DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
+      "1000,3,10000,2";
+
+  private static final boolean
+      DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_ENABLED_DEFAULT = true;
+
+  private static final Text WASB_DT_SERVICE_NAME = new Text("WASB_DT_SERVICE");
+  /**
+   *  Query parameter value for Getting delegation token http request
+   */
+  private static final String GET_DELEGATION_TOKEN_OP = "GETDELEGATIONTOKEN";
+  /**
+   * Query parameter value for renewing delegation token http request
+   */
+  private static final String RENEW_DELEGATION_TOKEN_OP = "RENEWDELEGATIONTOKEN";
+  /**
+   * Query parameter value for canceling the delegation token http request
+   */
+  private static final String CANCEL_DELEGATION_TOKEN_OP = "CANCELDELEGATIONTOKEN";
+  /**
+   * op parameter to represent the operation.
+   */
+  private static final String OP_PARAM_KEY_NAME = "op";
+  /**
+   * renewer parameter to represent the renewer of the delegation token.
+   */
+  private static final String RENEWER_PARAM_KEY_NAME = "renewer";
+  /**
+   * service parameter to represent the service which returns delegation tokens.
+   */
+  private static final String SERVICE_PARAM_KEY_NAME = "service";
+  /**
+   * token parameter to represent the delegation token.
+   */
+  private static final String TOKEN_PARAM_KEY_NAME = "token";
+  private WasbRemoteCallHelper remoteCallHelper;
+  private String[] dtServiceUrls;
+
+  public RemoteWasbDelegationTokenManager(Configuration conf)
+      throws IOException {
+    RetryPolicy retryPolicy = RetryUtils.getMultipleLinearRandomRetry(conf,
+        DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_ENABLED_KEY,
+        DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
+        DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_SPEC_KEY,
+        DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
+
+    remoteCallHelper = new SecureWasbRemoteCallHelper(retryPolicy, true);
+    this.dtServiceUrls =
+        conf.getTrimmedStrings(KEY_DELEGATION_TOKEN_SERVICE_URLS);
+    if (this.dtServiceUrls == null || this.dtServiceUrls.length <= 0) {
+      throw new IOException(
+          KEY_DELEGATION_TOKEN_SERVICE_URLS + " config not set"
+              + " in configuration.");
+    }
+  }
+
+  @Override
+  public Token<DelegationTokenIdentifier> getDelegationToken(
+      String renewer) throws IOException {
+    URIBuilder uriBuilder =
+        new URIBuilder().setPath(DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT)
+            .addParameter(OP_PARAM_KEY_NAME, GET_DELEGATION_TOKEN_OP)
+            .addParameter(RENEWER_PARAM_KEY_NAME, renewer)
+            .addParameter(SERVICE_PARAM_KEY_NAME, WASB_DT_SERVICE_NAME.toString());
+    String responseBody = remoteCallHelper
+        .makeRemoteRequest(dtServiceUrls, uriBuilder.getPath(),
+            uriBuilder.getQueryParams(), HttpGet.METHOD_NAME);
+    return TokenUtils.toDelegationToken(JsonUtils.parse(responseBody));
+  }
+
+  @Override
+  public long renewDelegationToken(Token<?> token)
+      throws IOException {
+    URIBuilder uriBuilder =
+        new URIBuilder().setPath(DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT)
+            .addParameter(OP_PARAM_KEY_NAME, RENEW_DELEGATION_TOKEN_OP)
+            .addParameter(TOKEN_PARAM_KEY_NAME, token.encodeToUrlString());
+
+    String responseBody = remoteCallHelper
+        .makeRemoteRequest(dtServiceUrls, uriBuilder.getPath(),
+            uriBuilder.getQueryParams(), HttpPut.METHOD_NAME);
+
+    Map<?, ?> parsedResp = JsonUtils.parse(responseBody);
+    return ((Number) parsedResp.get("long")).longValue();
+  }
+
+  @Override
+  public void cancelDelegationToken(Token<?> token)
+      throws IOException {
+    URIBuilder uriBuilder =
+        new URIBuilder().setPath(DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT)
+            .addParameter(OP_PARAM_KEY_NAME, CANCEL_DELEGATION_TOKEN_OP)
+            .addParameter(TOKEN_PARAM_KEY_NAME, token.encodeToUrlString());
+    remoteCallHelper.makeRemoteRequest(dtServiceUrls, uriBuilder.getPath(),
+        uriBuilder.getQueryParams(), HttpPut.METHOD_NAME);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38996fdc/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/SecurityUtils.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/SecurityUtils.java
deleted file mode 100644
index 61bf846..0000000
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/SecurityUtils.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azure.security;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.azure.RemoteWasbAuthorizerImpl;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Iterator;
-
-/**
- * Security Utils class for WASB.
- */
-public final class SecurityUtils {
-
-  private SecurityUtils() {
-  }
-
-  /**
-   * Utility method to get remote service URLs from the configuration.
-   * @param conf configuration object.
-   * @return remote service URL
-   * @throws UnknownHostException thrown when getting the default value.
-   */
-  public static String getCredServiceUrls(Configuration conf)
-      throws UnknownHostException {
-    return conf.get(Constants.KEY_CRED_SERVICE_URL, String
-        .format("http://%s:%s",
-            InetAddress.getLocalHost().getCanonicalHostName(),
-            Constants.DEFAULT_CRED_SERVICE_PORT));
-  }
-
-  /**
-   * Utility method to get remote Authorization service URLs from the configuration.
-   * @param conf Configuration object.
-   * @return remote Authorization server URL
-   * @throws UnknownHostException thrown when getting the default value.
-   */
-  public static String getRemoteAuthServiceUrls(Configuration conf)
-      throws UnknownHostException {
-    return conf.get(RemoteWasbAuthorizerImpl.KEY_REMOTE_AUTH_SERVICE_URL, String
-        .format("http://%s:%s",
-            InetAddress.getLocalHost().getCanonicalHostName(),
-            Constants.DEFAULT_CRED_SERVICE_PORT));
-  }
-
-  /**
-   * Utility method to get delegation token from the UGI credentials.
-   * @return delegation token
-   * @throws IOException thrown when getting the current user.
-   */
-  public static String getDelegationTokenFromCredentials() throws IOException {
-    String delegationToken = null;
-    Iterator<Token<? extends TokenIdentifier>> tokenIterator = UserGroupInformation
-        .getCurrentUser().getCredentials().getAllTokens().iterator();
-    while (tokenIterator.hasNext()) {
-      Token<? extends TokenIdentifier> iteratedToken = tokenIterator.next();
-      if (iteratedToken.getKind()
-          .equals(WasbDelegationTokenIdentifier.TOKEN_KIND)) {
-        delegationToken = iteratedToken.encodeToUrlString();
-      }
-    }
-    return delegationToken;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38996fdc/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/TokenUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/TokenUtils.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/TokenUtils.java
new file mode 100644
index 0000000..90b9082
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/TokenUtils.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure.security;
+
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Utility methods common for token management
+ */
+public final class TokenUtils {
+  public static final Logger LOG = LoggerFactory.getLogger(TokenUtils.class);
+  public static final String URL_STRING = "urlString";
+
+  private TokenUtils() {
+  }
+
+  public static Token<DelegationTokenIdentifier> toDelegationToken(
+      final Map<?, ?> inputMap) throws IOException {
+    final Map<?, ?> m = (Map<?, ?>) inputMap.get(Token.class.getSimpleName());
+    return (Token<DelegationTokenIdentifier>) toToken(m);
+  }
+
+  public static Token<? extends TokenIdentifier> toToken(final Map<?, ?> m)
+      throws IOException {
+    if (m == null) {
+      return null;
+    }
+    String urlString = (String) m.get(URL_STRING);
+    if (urlString != null) {
+      final Token<DelegationTokenIdentifier> token = new Token<>();
+      LOG.debug("Read url string param - {}", urlString);
+      token.decodeFromUrlString(urlString);
+      return token;
+    }
+    return null;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38996fdc/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/WasbDelegationTokenManager.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/WasbDelegationTokenManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/WasbDelegationTokenManager.java
new file mode 100644
index 0000000..1d73416
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/WasbDelegationTokenManager.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure.security;
+
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
+
+import java.io.IOException;
+
+/**
+ * Interface for Managing the Delegation tokens.
+ */
+public interface WasbDelegationTokenManager {
+
+  /**
+   * Get Delegation token
+   * @param renewer delegation token renewer
+   * @return delegation token
+   * @throws IOException when error in getting the delegation token
+   */
+  Token<DelegationTokenIdentifier> getDelegationToken(String renewer)
+      throws IOException;
+
+  /**
+   * Renew the delegation token
+   * @param token delegation token.
+   * @return renewed time.
+   * @throws IOException when error in renewing the delegation token
+   */
+  long renewDelegationToken(Token<?> token) throws IOException;
+
+  /**
+   * Cancel the delegation token
+   * @param token delegation token.
+   * @throws IOException when error in cancelling the delegation token.
+   */
+  void cancelDelegationToken(Token<?> token) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38996fdc/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/WasbTokenRenewer.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/WasbTokenRenewer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/WasbTokenRenewer.java
index 7994bde..6df7647 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/WasbTokenRenewer.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/security/WasbTokenRenewer.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,27 +20,19 @@ package org.apache.hadoop.fs.azure.security;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenRenewer;
-import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
-import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
-import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
-import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.URL;
-import java.security.PrivilegedExceptionAction;
 
 /**
  * Token Renewer for renewing WASB delegation tokens with remote service.
  */
 public class WasbTokenRenewer extends TokenRenewer {
-  public static final Logger LOG = LoggerFactory
-      .getLogger(WasbTokenRenewer.class);
+  public static final Logger LOG =
+      LoggerFactory.getLogger(WasbTokenRenewer.class);
 
   /**
    * Checks if this particular object handles the Kind of token passed.
@@ -75,32 +67,7 @@ public class WasbTokenRenewer extends TokenRenewer {
   public long renew(final Token<?> token, Configuration conf)
       throws IOException, InterruptedException {
     LOG.debug("Renewing the delegation token");
-    final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-    UserGroupInformation connectUgi = ugi.getRealUser();
-    final UserGroupInformation proxyUser = connectUgi;
-    if (connectUgi == null) {
-      connectUgi = ugi;
-    }
-    connectUgi.checkTGTAndReloginFromKeytab();
-    final DelegationTokenAuthenticatedURL.Token authToken = new DelegationTokenAuthenticatedURL.Token();
-    authToken
-        .setDelegationToken((Token<AbstractDelegationTokenIdentifier>) token);
-    final String credServiceUrl = conf.get(Constants.KEY_CRED_SERVICE_URL,
-        String.format("http://%s:%s",
-            InetAddress.getLocalHost().getCanonicalHostName(),
-            Constants.DEFAULT_CRED_SERVICE_PORT));
-    DelegationTokenAuthenticator authenticator = new KerberosDelegationTokenAuthenticator();
-    final DelegationTokenAuthenticatedURL authURL = new DelegationTokenAuthenticatedURL(
-        authenticator);
-
-    return connectUgi.doAs(new PrivilegedExceptionAction<Long>() {
-      @Override
-      public Long run() throws Exception {
-        return authURL.renewDelegationToken(new URL(credServiceUrl
-                + Constants.DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT),
-            authToken, (proxyUser != null) ? ugi.getShortUserName() : null);
-      }
-    });
+    return getInstance(conf).renewDelegationToken(token);
   }
 
   /**
@@ -114,31 +81,11 @@ public class WasbTokenRenewer extends TokenRenewer {
   public void cancel(final Token<?> token, Configuration conf)
       throws IOException, InterruptedException {
     LOG.debug("Cancelling the delegation token");
-    final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-    UserGroupInformation connectUgi = ugi.getRealUser();
-    final UserGroupInformation proxyUser = connectUgi;
-    if (connectUgi == null) {
-      connectUgi = ugi;
-    }
-    connectUgi.checkTGTAndReloginFromKeytab();
-    final DelegationTokenAuthenticatedURL.Token authToken = new DelegationTokenAuthenticatedURL.Token();
-    authToken
-        .setDelegationToken((Token<AbstractDelegationTokenIdentifier>) token);
-    final String credServiceUrl = conf.get(Constants.KEY_CRED_SERVICE_URL,
-        String.format("http://%s:%s",
-            InetAddress.getLocalHost().getCanonicalHostName(),
-            Constants.DEFAULT_CRED_SERVICE_PORT));
-    DelegationTokenAuthenticator authenticator = new KerberosDelegationTokenAuthenticator();
-    final DelegationTokenAuthenticatedURL authURL = new DelegationTokenAuthenticatedURL(
-        authenticator);
-    connectUgi.doAs(new PrivilegedExceptionAction<Void>() {
-      @Override
-      public Void run() throws Exception {
-        authURL.cancelDelegationToken(new URL(credServiceUrl
-                + Constants.DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT),
-            authToken, (proxyUser != null) ? ugi.getShortUserName() : null);
-        return null;
-      }
-    });
+    getInstance(conf).cancelDelegationToken(token);
+  }
+
+  private WasbDelegationTokenManager getInstance(Configuration conf)
+      throws IOException {
+    return new RemoteWasbDelegationTokenManager(conf);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38996fdc/hadoop-tools/hadoop-azure/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/index.md b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
index 9c57e60..740be52 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/index.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
@@ -316,12 +316,12 @@ To enable SAS key generation locally following property needs to be set to true.
 </property>
 ```
 
-To use the remote SAS key generation mode, an external REST service is expected to provided required SAS keys.
+To use the remote SAS key generation mode, comma separated external REST services are expected to provided required SAS keys.
 Following property can used to provide the end point to use for remote SAS Key generation:
 
 ```xml
 <property>
-  <name>fs.azure.cred.service.url</name>
+  <name>fs.azure.cred.service.urls</name>
   <value>{URL}</value>
 </property>
 ```
@@ -354,11 +354,11 @@ Authorization support can be enabled in WASB using the following configuration:
 ```
 
 The current implementation of authorization relies on the presence of an external service that can enforce
-the authorization. The service is expected to be running on a URL provided by the following config.
+the authorization. The service is expected to be running on comma separated URLs provided by the following config.
 
 ```xml
 <property>
-  <name>fs.azure.authorization.remote.service.url</name>
+  <name>fs.azure.authorization.remote.service.urls</name>
   <value>{URL}</value>
 </property>
 ```
@@ -377,6 +377,42 @@ The service is expected to return a response in JSON format:
 }
 ```
 
+### Delegation token support in WASB
+
+Delegation token support support can be enabled in WASB using the following configuration:
+
+```xml
+<property>
+  <name>fs.azure.enable.kerberos.support</name>
+  <value>true</value>
+</property>
+```
+
+The current implementation of delegation token implementation relies on the presence of an external service instances that can generate and manage delegation tokens. The service is expected to be running on comma separated URLs provided by the following config.
+
+```xml
+<property>
+  <name>fs.azure.delegation.token.service.urls</name>
+  <value>{URL}</value>
+</property>
+```
+
+The remote service is expected to provide support for the following REST call: ```{URL}?op=GETDELEGATIONTOKEN```, ```{URL}?op=RENEWDELEGATIONTOKEN``` and ```{URL}?op=CANCELDELEGATIONTOKEN```
+An example request:
+  ```{URL}?op=GETDELEGATIONTOKEN&renewer=<renewer>```
+  ```{URL}?op=RENEWDELEGATIONTOKEN&token=<delegation token>```
+  ```{URL}?op=CANCELDELEGATIONTOKEN&token=<delegation token>```
+
+The service is expected to return a response in JSON format for GETDELEGATIONTOKEN request:
+
+```json
+{
+    "Token" : {
+        "urlString": URL string of delegation token.
+    }
+}
+```
+
 ## Testing the hadoop-azure Module
 
 The hadoop-azure module includes a full suite of unit tests.  Most of the tests


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message