tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From prak...@apache.org
Subject tez git commit: TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster. (pramachandran)
Date Wed, 27 May 2015 15:50:14 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.6 c1d334b4d -> 493942d29


TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster. (pramachandran)

(cherry picked from commit 8710df0d1264a453218220ed48e5d2b5d2923da1)

Conflicts:
	CHANGES.txt
	tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/493942d2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/493942d2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/493942d2

Branch: refs/heads/branch-0.6
Commit: 493942d293eb5b4baa29b7dc2db9db2f6a96af2e
Parents: c1d334b
Author: Prakash Ramachandran <pramachandran@hortonworks.com>
Authored: Wed May 27 17:59:17 2015 +0530
Committer: Prakash Ramachandran <pramachandran@hortonworks.com>
Committed: Wed May 27 21:19:02 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   9 +-
 .../org/apache/tez/common/ReflectionUtils.java  |  19 +
 .../tez/dag/api/client/DAGClientImpl.java       |   9 +-
 .../dag/api/client/DAGClientTimelineImpl.java   |  56 ++-
 .../dag/api/client/TimelineReaderFactory.java   | 387 +++++++++++++++++++
 .../tez/dag/api/client/TestATSHttpClient.java   |   6 +-
 .../api/client/TestTimelineReaderFactory.java   |  91 +++++
 7 files changed, 536 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/493942d2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 27c3429..da6ba98 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,7 +1,14 @@
 Apache Tez Change Log
 =====================
 
-Release 0.6.1: Unreleased
+Release 0.6.2: Unreleased
+
+INCOMPATIBLE CHANGES
+
+ALL CHANGES:
+  TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster
+
+Release 0.6.1: 2015-05-18
 
 INCOMPATIBLE CHANGES
 

http://git-wip-us.apache.org/repos/asf/tez/blob/493942d2/tez-api/src/main/java/org/apache/tez/common/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ReflectionUtils.java b/tez-api/src/main/java/org/apache/tez/common/ReflectionUtils.java
index 0fc529b..f1eb0ae 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ReflectionUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ReflectionUtils.java
@@ -100,6 +100,25 @@ public class ReflectionUtils {
   }
 
   @Private
+  @SuppressWarnings("unchecked")
+  public static <T> T invokeMethod(Object target, Method method, Object... args) {
+    try {
+      return (T) method.invoke(target, args);
+    } catch (Exception e) {
+      throw new TezUncheckedException(e);
+    }
+  }
+
+  @Private
+  public static Method getMethod(Class<?> targetClazz, String methodName, Class<?>...
parameterTypes) {
+    try {
+      return targetClazz.getMethod(methodName, parameterTypes);
+    } catch (NoSuchMethodException e) {
+      throw new TezUncheckedException(e);
+    }
+  }
+
+  @Private
   public static synchronized void addResourcesToClasspath(List<URL> urls) {
     ClassLoader classLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]), Thread
         .currentThread().getContextClassLoader());

http://git-wip-us.apache.org/repos/asf/tez/blob/493942d2/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
index 2f2dbbb..1360e2d 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
@@ -31,7 +31,6 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -90,10 +89,7 @@ public class DAGClientImpl extends DAGClient {
             conf.getBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED,
                  TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT);
 
-    if (UserGroupInformation.isSecurityEnabled()){
-      //TODO: enable ATS integration in kerberos secured cluster - see TEZ-1529
-      isATSEnabled = false;
-    }
+    isATSEnabled = DAGClientTimelineImpl.isSupported();
 
     realClient = new DAGClientRPCImpl(appId, dagId, conf, this.frameworkClient);
   }
@@ -447,7 +443,8 @@ public class DAGClientImpl extends DAGClient {
 
   private void switchToTimelineClient() throws IOException, TezException {
     realClient.close();
-    realClient = new DAGClientTimelineImpl(appId, dagId, conf, frameworkClient);
+    realClient = new DAGClientTimelineImpl(appId, dagId, conf, frameworkClient,
+        (int) (2 * PRINT_STATUS_INTERVAL_MILLIS));
     if (LOG.isDebugEnabled()) {
       LOG.debug("dag completed switching to DAGClientTimelineImpl");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/493942d2/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
index 6cbab80..8fef2d5 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
@@ -23,9 +23,6 @@ import javax.ws.rs.core.MediaType;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -40,16 +37,10 @@ import com.sun.jersey.api.client.ClientHandlerException;
 import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.UniformInterfaceException;
 import com.sun.jersey.api.client.WebResource;
-import com.sun.jersey.api.client.config.ClientConfig;
-import com.sun.jersey.api.client.config.DefaultClientConfig;
-import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
-import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
-import com.sun.jersey.json.impl.provider.entity.JSONRootElementProvider;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -78,10 +69,10 @@ public class DAGClientTimelineImpl extends DAGClient {
   private static final String FILTER_BY_FIELDS = "primaryfilters,otherinfo";
   private static final String HTTPS_SCHEME = "https://";
   private static final String HTTP_SCHEME = "http://";
-  private static Client httpClient = null;
+  private Client httpClient = null;
+  private final TimelineReaderFactory.TimelineReaderStrategy timelineReaderStrategy;
   private final ApplicationId appId;
   private final String dagId;
-  private final TezConfiguration conf;
   private final FrameworkClient frameworkClient;
 
   private Map<String, VertexTaskStats> vertexTaskStatsCache = null;
@@ -90,16 +81,21 @@ public class DAGClientTimelineImpl extends DAGClient {
   protected String baseUri;
 
   public DAGClientTimelineImpl(ApplicationId appId, String dagId, TezConfiguration conf,
-                               FrameworkClient frameworkClient)
+                               FrameworkClient frameworkClient, int connTimeout)
       throws TezException {
+
+    if (!TimelineReaderFactory.isTimelineClientSupported()) {
+      throw new TezException("Reading from secure timeline is supported only for hadoop 2.6
and above.");
+    }
+
     this.appId = appId;
     this.dagId = dagId;
-    this.conf = conf;
     this.frameworkClient = frameworkClient;
 
     String scheme;
     String webAppAddress;
-    if (webappHttpsOnly(conf)) {
+    boolean useHttps = webappHttpsOnly(conf);
+    if (useHttps) {
       scheme = HTTPS_SCHEME;
       webAppAddress = conf.get(ATSConstants.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS_CONF_NAME);
     } else {
@@ -111,8 +107,14 @@ public class DAGClientTimelineImpl extends DAGClient {
     }
 
     baseUri = Joiner.on("").join(scheme, webAppAddress, ATSConstants.RESOURCE_URI_BASE);
+
+    timelineReaderStrategy =
+        TimelineReaderFactory.getTimelineReaderStrategy(conf, useHttps, connTimeout);
   }
 
+  public static boolean isSupported() {
+    return TimelineReaderFactory.isTimelineClientSupported();
+  }
 
   @Override
   public String getExecutionContext() {
@@ -407,13 +409,15 @@ public class DAGClientTimelineImpl extends DAGClient {
   @VisibleForTesting
   protected JSONObject getJsonRootEntity(String url) throws TezException {
     try {
-      WebResource wr = getHttpClient().resource(url);
+      WebResource wr = getCachedHttpClient().resource(url);
       ClientResponse response = wr.accept(MediaType.APPLICATION_JSON_TYPE)
           .type(MediaType.APPLICATION_JSON_TYPE)
           .get(ClientResponse.class);
 
-      if (response.getClientResponseStatus() != ClientResponse.Status.OK) {
-        throw new TezException("Failed to get response from YARN Timeline: url: " + url);
+      final ClientResponse.Status clientResponseStatus = response.getClientResponseStatus();
+      if (clientResponseStatus != ClientResponse.Status.OK) {
+        throw new TezException("Failed to get response from YARN Timeline:" +
+            " errorCode:" + clientResponseStatus + ", url:" + url);
       }
 
       return response.getEntity(JSONObject.class);
@@ -423,6 +427,8 @@ public class DAGClientTimelineImpl extends DAGClient {
       throw new TezException("Error accessing content from YARN Timeline - unexpected response",
e);
     } catch (IllegalArgumentException e) {
       throw new TezException("Error accessing content from YARN Timeline - invalid url",
e);
+    } catch (IOException e) {
+      throw new TezException("Error failed to get http client", e);
     }
   }
 
@@ -460,11 +466,9 @@ public class DAGClientTimelineImpl extends DAGClient {
     }
   }
 
-  protected Client getHttpClient() {
+  protected Client getCachedHttpClient() throws IOException {
     if (httpClient == null) {
-      ClientConfig config = new DefaultClientConfig(JSONRootElementProvider.App.class);
-      HttpURLConnectionFactory urlFactory = new PseudoAuthenticatedURLConnectionFactory();
-      httpClient = new Client(new URLConnectionClientHandler(urlFactory), config);
+      httpClient = timelineReaderStrategy.getHttpClient();
     }
     return httpClient;
   }
@@ -495,14 +499,4 @@ public class DAGClientTimelineImpl extends DAGClient {
         put("TERMINATING", VertexStatusStateProto.VERTEX_TERMINATING);
       }});
 
-
-  class PseudoAuthenticatedURLConnectionFactory implements HttpURLConnectionFactory {
-    @Override
-    public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
-      String tokenString = (url.getQuery() == null ? "?" : "&") + "user.name=" +
-          URLEncoder.encode(UserGroupInformation.getCurrentUser().getShortUserName(), "UTF8");
-      return (HttpURLConnection) (new URL(url.toString() + tokenString)).openConnection();
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/493942d2/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java
b/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java
new file mode 100644
index 0000000..f544198
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java
@@ -0,0 +1,387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.client;
+
+import static org.apache.hadoop.security.ssl.SSLFactory.Mode.CLIENT;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLSocketFactory;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.security.GeneralSecurityException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+import com.sun.jersey.json.impl.provider.entity.JSONRootElementProvider;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
+import org.apache.hadoop.security.authentication.client.Authenticator;
+import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
+import org.apache.hadoop.security.ssl.SSLFactory;
+import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ *  TimelineReaderFactory getTimelineReaderStrategy returns a Strategy class, which is used
to
+ *  create a httpclient, configured for the appropriate runtime.
+ *
+ *  on hadoop 2.6+ the factory returns TimelineReaderTokenAuthenticatedStrategy, which supports
+ *  kerberos based auth (secure cluster) or psuedo auth (un-secure cluster).
+ *
+ *  on hadoop 2.4 where the token delegation auth is not supported, TimelineReaderPseudoAuthenticatedStrategy
+ *  is used which supports only unsecure timeline.
+ *
+ */
+@InterfaceAudience.Private
+public class TimelineReaderFactory {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TimelineReaderFactory.class);
+
+  private static final String KERBEROS_DELEGATION_TOKEN_AUTHENTICATOR_CLAZZ_NAME =
+      "org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator";
+  private static final String PSEUDO_DELEGATION_TOKEN_AUTHENTICATOR_CLAZZ_NAME =
+      "org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator";
+  private static final String DELEGATION_TOKEN_AUTHENTICATED_URL_CLAZZ_NAME =
+      "org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL";
+  private static final String DELEGATION_TOKEN_AUTHENTICATOR_CLAZZ_NAME =
+      "org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator";
+  private static final String DELEGATION_TOKEN_AUTHENTICATED_URL_TOKEN_CLASS_NAME =
+      "org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL$Token";
+
+  private static Class<?> delegationTokenAuthenticatorClazz = null;
+  private static Method delegationTokenAuthenticateURLOpenConnectionMethod = null;
+
+  public static TimelineReaderStrategy getTimelineReaderStrategy(Configuration conf,
+                                                                 boolean useHttps,
+                                                                 int connTimeout) throws
TezException {
+
+    TimelineReaderStrategy timelineReaderStrategy;
+
+    if (!isTimelineClientSupported()) {
+      throw new TezException("Reading from timeline is not supported." +
+          " token delegation support: " + tokenDelegationSupported() +
+          ", is secure timeline: " + UserGroupInformation.isSecurityEnabled());
+    }
+
+    timelineReaderStrategy = getTimelineReaderStrategy(tokenDelegationSupported(), conf,
useHttps,
+        connTimeout);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Using " + timelineReaderStrategy.getClass().getName() + " to read timeline
data");
+    }
+
+    return timelineReaderStrategy;
+  }
+
+  private static TimelineReaderStrategy getTimelineReaderStrategy(boolean isTokenDelegationSupported,
+                                                                    Configuration conf,
+                                                                    boolean useHttps,
+                                                                    int connTimeout) {
+    TimelineReaderStrategy timelineReaderStrategy;
+
+    if (isTokenDelegationSupported) {
+      timelineReaderStrategy =
+          new TimelineReaderTokenAuthenticatedStrategy(conf, useHttps, connTimeout);
+    } else {
+      timelineReaderStrategy =
+          new TimelineReaderPseudoAuthenticatedStrategy(conf, useHttps, connTimeout);
+    }
+
+    return timelineReaderStrategy;
+  }
+
+  /**
+   * Check if timeline client can be supported.
+   *
+   * @return boolean value indicating if timeline client to read data is supported.
+   */
+  public static boolean isTimelineClientSupported() {
+    // support to read data from timeline is based on the version of hadoop.
+    // reads are supported for non-secure cluster from hadoop 2.4 and up.
+    // reads are supported for secure cluster only from hadoop 2.6. check the presence of
the classes
+    // required upfront if security is enabled.
+    return !UserGroupInformation.isSecurityEnabled() || tokenDelegationSupported();
+  }
+
+  public interface TimelineReaderStrategy {
+    Client getHttpClient() throws IOException;
+  }
+
+  /*
+   * auth strategy for secured and unsecured environment with delegation token (hadoop 2.6
and above)
+   */
+  private static class TimelineReaderTokenAuthenticatedStrategy implements TimelineReaderStrategy
{
+    private final Configuration conf;
+    private final boolean useHttps;
+    private final int connTimeout;
+
+    public TimelineReaderTokenAuthenticatedStrategy(final Configuration conf,
+                                                    final boolean useHttps,
+                                                    final int connTimeout) {
+
+      this.conf = conf;
+      this.useHttps = useHttps;
+      this.connTimeout = connTimeout;
+    }
+
+    @Override
+    public Client getHttpClient() throws IOException {
+      Authenticator authenticator;
+      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+      UserGroupInformation realUgi = ugi.getRealUser();
+      UserGroupInformation authUgi;
+      String doAsUser;
+      ClientConfig clientConfig = new DefaultClientConfig(JSONRootElementProvider.App.class);
+      ConnectionConfigurator connectionConfigurator = getNewConnectionConf(conf, useHttps,
+          connTimeout);
+
+      try {
+        authenticator = getTokenAuthenticator();
+        authenticator.setConnectionConfigurator(connectionConfigurator);
+      } catch (TezUncheckedException e) {
+        throw new IOException("Failed to get authenticator", e);
+      }
+
+      if (realUgi != null) {
+        authUgi = realUgi;
+        doAsUser = ugi.getShortUserName();
+      } else {
+        authUgi = ugi;
+        doAsUser = null;
+      }
+
+      HttpURLConnectionFactory connectionFactory =
+          new TokenAuthenticatedURLConnectionFactory(connectionConfigurator, authenticator,
+              authUgi, doAsUser);
+      return new Client(new URLConnectionClientHandler(connectionFactory), clientConfig);
+    }
+
+    private static Authenticator getTokenAuthenticator() {
+      String authenticatorClazzName;
+
+      if (UserGroupInformation.isSecurityEnabled()) {
+        authenticatorClazzName = KERBEROS_DELEGATION_TOKEN_AUTHENTICATOR_CLAZZ_NAME;
+      } else {
+        authenticatorClazzName = PSEUDO_DELEGATION_TOKEN_AUTHENTICATOR_CLAZZ_NAME;
+      }
+
+      return ReflectionUtils.createClazzInstance(authenticatorClazzName);
+    }
+
+    private static class TokenAuthenticatedURLConnectionFactory implements HttpURLConnectionFactory
{
+
+      private final Authenticator authenticator;
+      private final ConnectionConfigurator connConfigurator;
+      private final UserGroupInformation authUgi;
+      private final String doAsUser;
+      private final AuthenticatedURL.Token token;
+
+      public TokenAuthenticatedURLConnectionFactory(ConnectionConfigurator connConfigurator,
+                                                    Authenticator authenticator,
+                                                    UserGroupInformation authUgi,
+                                                    String doAsUser) {
+        this.connConfigurator = connConfigurator;
+        this.authenticator = authenticator;
+        this.authUgi = authUgi;
+        this.doAsUser = doAsUser;
+        this.token = ReflectionUtils.createClazzInstance(
+            DELEGATION_TOKEN_AUTHENTICATED_URL_TOKEN_CLASS_NAME, null, null);
+      }
+
+      @Override
+      public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
+        try {
+          AuthenticatedURL authenticatedURL= ReflectionUtils.createClazzInstance(
+              DELEGATION_TOKEN_AUTHENTICATED_URL_CLAZZ_NAME, new Class[] {
+              delegationTokenAuthenticatorClazz,
+              ConnectionConfigurator.class
+          }, new Object[] {
+              authenticator,
+              connConfigurator
+          });
+          return ReflectionUtils.invokeMethod(authenticatedURL,
+              delegationTokenAuthenticateURLOpenConnectionMethod, url, token, doAsUser);
+        } catch (Exception e) {
+          throw new IOException(e);
+        }
+      }
+    }
+  }
+
+  /*
+   * Pseudo auth strategy for env where delegation token is not supported (hadoop 2.4)
+   */
+  @VisibleForTesting
+  protected static class TimelineReaderPseudoAuthenticatedStrategy implements TimelineReaderStrategy
{
+
+    private final ConnectionConfigurator connectionConf;
+
+    public TimelineReaderPseudoAuthenticatedStrategy(final Configuration conf,
+                                                     final boolean useHttps,
+                                                     final int connTimeout) {
+      connectionConf = getNewConnectionConf(conf, useHttps, connTimeout);
+    }
+
+    @Override
+    public Client getHttpClient() {
+      ClientConfig config = new DefaultClientConfig(JSONRootElementProvider.App.class);
+      HttpURLConnectionFactory urlFactory = new PseudoAuthenticatedURLConnectionFactory(connectionConf);
+      Client httpClient = new Client(new URLConnectionClientHandler(urlFactory), config);
+      return httpClient;
+    }
+
+    @VisibleForTesting
+    protected static class PseudoAuthenticatedURLConnectionFactory implements HttpURLConnectionFactory
{
+      private final ConnectionConfigurator connectionConf;
+
+      public PseudoAuthenticatedURLConnectionFactory(ConnectionConfigurator connectionConf)
{
+        this.connectionConf = connectionConf;
+      }
+
+      @Override
+      public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
+        String tokenString = (url.getQuery() == null ? "?" : "&") + "user.name=" +
+            URLEncoder.encode(UserGroupInformation.getCurrentUser().getShortUserName(), "UTF8");
+
+        HttpURLConnection httpURLConnection =
+            (HttpURLConnection) (new URL(url.toString() + tokenString)).openConnection();
+        this.connectionConf.configure(httpURLConnection);
+
+        return httpURLConnection;
+      }
+    }
+  }
+
+  private static ConnectionConfigurator getNewConnectionConf(final Configuration conf,
+                                                             final boolean useHttps,
+                                                             final int connTimeout) {
+    ConnectionConfigurator connectionConf = null;
+    if (useHttps) {
+      try {
+        connectionConf = getNewSSLConnectionConf(conf, connTimeout);
+      } catch (IOException e) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Cannot load customized ssl related configuration."
+              + " Falling back to system-generic settings.", e);
+        }
+      }
+    }
+
+    if (connectionConf == null) {
+      connectionConf = new ConnectionConfigurator() {
+        @Override
+        public HttpURLConnection configure(HttpURLConnection httpURLConnection) throws IOException
{
+          setTimeouts(httpURLConnection, connTimeout);
+          return httpURLConnection;
+        }
+      };
+    }
+
+    return connectionConf;
+  }
+
+  private static ConnectionConfigurator getNewSSLConnectionConf(final Configuration conf,
+                                                                final int connTimeout)
+      throws IOException {
+    final SSLFactory sslFactory;
+    final SSLSocketFactory sslSocketFactory;
+    final HostnameVerifier hostnameVerifier;
+
+    sslFactory = new SSLFactory(CLIENT, conf);
+    try {
+      sslFactory.init();
+      sslSocketFactory = sslFactory.createSSLSocketFactory();
+    } catch (GeneralSecurityException e) {
+      sslFactory.destroy();
+      throw new IOException("Failed to initialize ssl factory");
+    }
+    hostnameVerifier = sslFactory.getHostnameVerifier();
+
+    return new ConnectionConfigurator() {
+      @Override
+      public HttpURLConnection configure(HttpURLConnection httpURLConnection) throws IOException
{
+        if (!(httpURLConnection instanceof HttpsURLConnection)) {
+          throw new IOException("Expected https connection");
+        }
+        HttpsURLConnection httpsURLConnection = (HttpsURLConnection) httpURLConnection;
+        httpsURLConnection.setSSLSocketFactory(sslSocketFactory);
+        httpsURLConnection.setHostnameVerifier(hostnameVerifier);
+        setTimeouts(httpsURLConnection, connTimeout);
+
+        return httpsURLConnection;
+      }
+    };
+  }
+
+  private static void setTimeouts(HttpURLConnection httpURLConnection, int connTimeout) {
+    httpURLConnection.setConnectTimeout(connTimeout);
+    httpURLConnection.setReadTimeout(connTimeout);
+  }
+
+  private static boolean isTokenDelegationSupportChecksDone = false;
+  private static boolean isTokenDelegationClassesPresent = false;
+
+  // Check if all the classes required for doing token authentication are present. These
classes
+  // are present only from hadoop 2.6 onwards.
+  private static synchronized boolean tokenDelegationSupported() {
+
+    if (!isTokenDelegationSupportChecksDone) {
+
+      isTokenDelegationSupportChecksDone = true;
+
+      try {
+        ReflectionUtils.getClazz(KERBEROS_DELEGATION_TOKEN_AUTHENTICATOR_CLAZZ_NAME);
+        ReflectionUtils.getClazz(PSEUDO_DELEGATION_TOKEN_AUTHENTICATOR_CLAZZ_NAME);
+
+        delegationTokenAuthenticatorClazz =
+            ReflectionUtils.getClazz(DELEGATION_TOKEN_AUTHENTICATOR_CLAZZ_NAME);
+
+        Class<?> delegationTokenAuthenticatedURLClazz =
+            ReflectionUtils.getClazz(DELEGATION_TOKEN_AUTHENTICATED_URL_CLAZZ_NAME);
+
+        Class<?> delegationTokenAuthenticatedURLTokenClazz =
+            ReflectionUtils.getClazz(DELEGATION_TOKEN_AUTHENTICATED_URL_TOKEN_CLASS_NAME);
+
+        delegationTokenAuthenticateURLOpenConnectionMethod =
+            ReflectionUtils.getMethod(delegationTokenAuthenticatedURLClazz, "openConnection",
+                URL.class, delegationTokenAuthenticatedURLTokenClazz, String.class);
+
+        isTokenDelegationClassesPresent = true;
+
+      } catch (TezUncheckedException e) {
+        LOG.info("Could not find class required for token delegation, will fallback to pseudo
auth");
+      }
+    }
+
+    return isTokenDelegationClassesPresent;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/493942d2/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java b/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
index a72b799..ef1b0a5 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
@@ -56,7 +56,7 @@ public class TestATSHttpClient {
   public void testGetDagStatusThrowsExceptionOnEmptyJson() throws TezException {
     ApplicationId mockAppId = mock(ApplicationId.class);
     DAGClientTimelineImpl httpClient = new DAGClientTimelineImpl(mockAppId, "EXAMPLE_DAG_ID",
-        new TezConfiguration(), null);
+        new TezConfiguration(), null, 0);
     DAGClientTimelineImpl spyClient = spy(httpClient);
     spyClient.baseUri = "http://yarn.ats.webapp/ws/v1/timeline";
     final String expectedDagUrl = "http://yarn.ats.webapp/ws/v1/timeline/TEZ_DAG_ID/EXAMPLE_DAG_ID"
+
@@ -80,7 +80,7 @@ public class TestATSHttpClient {
   public void testGetDagStatusSimple() throws TezException, JSONException, IOException {
     DAGClientTimelineImpl
         httpClient = new DAGClientTimelineImpl(mock(ApplicationId.class),"EXAMPLE_DAG_ID",
-        new TezConfiguration(), null);
+        new TezConfiguration(), null, 0);
     DAGClientTimelineImpl spyClient = spy(httpClient);
     spyClient.baseUri = "http://yarn.ats.webapp/ws/v1/timeline";
     final String expectedDagUrl = "http://yarn.ats.webapp/ws/v1/timeline/TEZ_DAG_ID/EXAMPLE_DAG_ID"
+
@@ -140,7 +140,7 @@ public class TestATSHttpClient {
   public void testGetVertexStatusSimple() throws JSONException, TezException, IOException
{
     DAGClientTimelineImpl
         httpClient = new DAGClientTimelineImpl(mock(ApplicationId.class), "EXAMPLE_DAG_ID",
-        new TezConfiguration(), null);
+        new TezConfiguration(), null, 0);
     DAGClientTimelineImpl spyClient = spy(httpClient);
     spyClient.baseUri = "http://yarn.ats.webapp/ws/v1/timeline";
     final String expectedVertexUrl = "http://yarn.ats.webapp/ws/v1/timeline/TEZ_VERTEX_ID"
+

http://git-wip-us.apache.org/repos/asf/tez/blob/493942d2/tez-api/src/test/java/org/apache/tez/dag/api/client/TestTimelineReaderFactory.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/client/TestTimelineReaderFactory.java
b/tez-api/src/test/java/org/apache/tez/dag/api/client/TestTimelineReaderFactory.java
new file mode 100644
index 0000000..4aff0ca
--- /dev/null
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/TestTimelineReaderFactory.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.client;
+
+import static org.mockito.Mockito.mock;
+
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.TimelineReaderFactory.TimelineReaderPseudoAuthenticatedStrategy;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestTimelineReaderFactory {
+
+  @Before
+  public void setup() {
+    // Disable tests if hadoop version is less than 2.4.0
+    // as Timeline is not supported in 2.2.x or 2.3.x
+    String hadoopVersion = System.getProperty("tez.hadoop.version");
+    Assume.assumeFalse(hadoopVersion.startsWith("2.2.") || hadoopVersion.startsWith("2.3."));
+  }
+
+  // ensure on hadoop 2.4 TimelinePseudoAuthenticatedStrategy is used.
+  @Test(timeout = 5000)
+  public void testShouldUsePseudoAuthStrategyForHadoop24() throws TezException {
+    String hadoopVersion = System.getProperty("tez.hadoop.version");
+    Assume.assumeTrue(hadoopVersion.startsWith("2.4.") || hadoopVersion.startsWith("2.5."));
+
+    String returnedClassName =
+        TimelineReaderFactory.getTimelineReaderStrategy(mock(Configuration.class), false,
0)
+            .getClass()
+            .getCanonicalName();
+    Assert.assertEquals("should use pseudo auth on hadoop2.4",
+        "org.apache.tez.dag.api.client.TimelineReaderFactory.TimelineReaderPseudoAuthenticatedStrategy",
+        returnedClassName);
+  }
+
+  // ensure on hadoop 2.6+ TimelineReaderTokenAuthenticatedStrategy is used.
+  @Test(timeout = 5000)
+  public void testShouldUseTokenDelegationAuthStrategyForHadoop26() throws TezException {
+    String hadoopVersion = System.getProperty("tez.hadoop.version");
+    Assume.assumeFalse(hadoopVersion.startsWith("2.2.") ||
+        hadoopVersion.startsWith("2.3.") ||
+            hadoopVersion.startsWith("2.4.") ||
+            hadoopVersion.startsWith("2.5."));
+
+    String returnedClassName =
+        TimelineReaderFactory.getTimelineReaderStrategy(mock(Configuration.class), false,
0)
+            .getClass()
+            .getCanonicalName();
+    Assert.assertEquals("should use pseudo auth on hadoop2.4",
+        "org.apache.tez.dag.api.client.TimelineReaderFactory.TimelineReaderTokenAuthenticatedStrategy",
+        returnedClassName);
+  }
+
+  @Test(timeout = 5000)
+  public void testPseudoAuthenticatorConnectionUrlShouldHaveUserName() throws Exception {
+    ConnectionConfigurator connConf = mock(ConnectionConfigurator.class);
+    TimelineReaderPseudoAuthenticatedStrategy.PseudoAuthenticatedURLConnectionFactory
+        connectionFactory = new TimelineReaderPseudoAuthenticatedStrategy
+          .PseudoAuthenticatedURLConnectionFactory(connConf);
+    String inputUrl = "http://host:8080/path";
+    String expectedUrl = inputUrl + "?user.name=" + UserGroupInformation.getCurrentUser().getShortUserName();
+    HttpURLConnection httpURLConnection = connectionFactory.getHttpURLConnection(new URL(inputUrl));
+    Assert.assertEquals(expectedUrl, httpURLConnection.getURL().toString());
+  }
+
+}


Mime
View raw message