tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject tez git commit: TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster. (Prakash Ramachandran via hitesh)
Date Mon, 11 May 2015 18:47:29 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.6 12b6ed9a6 -> 5d95d998c


TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster. (Prakash Ramachandran
via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5d95d998
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5d95d998
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5d95d998

Branch: refs/heads/branch-0.6
Commit: 5d95d998c38a965adaa0c7810ecb57520ef9f110
Parents: 12b6ed9
Author: Hitesh Shah <hitesh@apache.org>
Authored: Mon May 11 11:47:18 2015 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Mon May 11 11:47:18 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../tez/dag/api/client/DAGClientImpl.java       |   5 -
 .../dag/api/client/DAGClientTimelineImpl.java   | 188 ++++++++++++++++---
 .../tez/dag/api/client/TestATSHttpClient.java   |   2 +-
 4 files changed, 170 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/5d95d998/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 189f27f..5527d3b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.6.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster.
   TEZ-2396. pig-tez-tfile-parser pom is hard coded to depend on 0.6.0-SNAPSHOT version.
   TEZ-2237. Valid events should be sent out when an Output is not started.
   TEZ-2399. Tez UI: add proper dependencies for computed properties
@@ -189,6 +190,7 @@ TEZ-UI CHANGES (TEZ-8):
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster.
   TEZ-2369. Add a few unit tests for RootInputInitializerManager. Backport a findbugs warning
fix from master.
   TEZ-2379. org.apache.hadoop.yarn.state.InvalidStateTransitonException:
     Invalid event: T_ATTEMPT_KILLED at KILLED.

http://git-wip-us.apache.org/repos/asf/tez/blob/5d95d998/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
index 2f2dbbb..8986c18 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
@@ -90,11 +90,6 @@ public class DAGClientImpl extends DAGClient {
             conf.getBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED,
                  TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT);
 
-    if (UserGroupInformation.isSecurityEnabled()){
-      //TODO: enable ATS integration in kerberos secured cluster - see TEZ-1529
-      isATSEnabled = false;
-    }
-
     realClient = new DAGClientRPCImpl(appId, dagId, conf, this.frameworkClient);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/5d95d998/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
index 6cbab80..248212e 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
@@ -19,13 +19,20 @@
 package org.apache.tez.dag.api.client;
 
 import javax.annotation.Nullable;
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLSocketFactory;
 import javax.ws.rs.core.MediaType;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.lang.reflect.UndeclaredThrowableException;
 import java.net.HttpURLConnection;
+import java.net.URI;
 import java.net.URL;
-import java.net.URLEncoder;
+import java.net.URLConnection;
+import java.security.GeneralSecurityException;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -50,6 +57,14 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
+import org.apache.hadoop.security.ssl.SSLFactory;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
+import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
+import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -81,8 +96,13 @@ public class DAGClientTimelineImpl extends DAGClient {
   private static Client httpClient = null;
   private final ApplicationId appId;
   private final String dagId;
-  private final TezConfiguration conf;
   private final FrameworkClient frameworkClient;
+  private final UserGroupInformation authUgi;
+  private final String doAsUser;
+  private final DelegationTokenAuthenticator authenticator;
+  private final DelegationTokenAuthenticatedURL.Token token;
+  private final ConnectionConfigurator connConfigurator;
+  private final static int DEFAULT_SOCKET_TIMEOUT = 30 * 1000; // 30 seconds
 
   private Map<String, VertexTaskStats> vertexTaskStatsCache = null;
 
@@ -91,10 +111,9 @@ public class DAGClientTimelineImpl extends DAGClient {
 
   public DAGClientTimelineImpl(ApplicationId appId, String dagId, TezConfiguration conf,
                                FrameworkClient frameworkClient)
-      throws TezException {
+      throws TezException, IOException {
     this.appId = appId;
     this.dagId = dagId;
-    this.conf = conf;
     this.frameworkClient = frameworkClient;
 
     String scheme;
@@ -111,8 +130,28 @@ public class DAGClientTimelineImpl extends DAGClient {
     }
 
     baseUri = Joiner.on("").join(scheme, webAppAddress, ATSConstants.RESOURCE_URI_BASE);
-  }
 
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    UserGroupInformation realUgi = ugi.getRealUser();
+    if (realUgi != null) {
+      authUgi = realUgi;
+      doAsUser = ugi.getShortUserName();
+    } else {
+      authUgi = ugi;
+      doAsUser = null;
+    }
+
+
+    if (UserGroupInformation.isSecurityEnabled()) {
+      authenticator = new KerberosDelegationTokenAuthenticator();
+    } else {
+      authenticator = new PseudoDelegationTokenAuthenticator();
+    }
+
+    connConfigurator = newConnConfigurator(conf);
+    authenticator.setConnectionConfigurator(connConfigurator);
+    token = new DelegationTokenAuthenticatedURL.Token();
+  }
 
   @Override
   public String getExecutionContext() {
@@ -125,7 +164,13 @@ public class DAGClientTimelineImpl extends DAGClient {
     try {
       appReport = frameworkClient.getApplicationReport(appId);
     } catch (YarnException e) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("error getting application report", e);
+      }
     } catch (IOException e) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("error getting application report", e);
+      }
     }
     return appReport;
   }
@@ -412,8 +457,13 @@ public class DAGClientTimelineImpl extends DAGClient {
           .type(MediaType.APPLICATION_JSON_TYPE)
           .get(ClientResponse.class);
 
-      if (response.getClientResponseStatus() != ClientResponse.Status.OK) {
-        throw new TezException("Failed to get response from YARN Timeline: url: " + url);
+      final ClientResponse.Status clientResponseStatus = response.getClientResponseStatus();
+      if (clientResponseStatus != ClientResponse.Status.OK) {
+        if (clientResponseStatus == ClientResponse.Status.UNAUTHORIZED) {
+          httpClient = null;
+        }
+        throw new TezException("Failed to get response from YARN Timeline: url: " + url +
+          " error: " + clientResponseStatus);
       }
 
       return response.getEntity(JSONObject.class);
@@ -423,6 +473,8 @@ public class DAGClientTimelineImpl extends DAGClient {
       throw new TezException("Error accessing content from YARN Timeline - unexpected response",
e);
     } catch (IllegalArgumentException e) {
       throw new TezException("Error accessing content from YARN Timeline - invalid url",
e);
+    } catch (IOException e) {
+      throw new TezException("Error getting http client connection", e);
     }
   }
 
@@ -460,15 +512,119 @@ public class DAGClientTimelineImpl extends DAGClient {
     }
   }
 
-  protected Client getHttpClient() {
+  protected Client getHttpClient() throws IOException, TezException {
     if (httpClient == null) {
-      ClientConfig config = new DefaultClientConfig(JSONRootElementProvider.App.class);
-      HttpURLConnectionFactory urlFactory = new PseudoAuthenticatedURLConnectionFactory();
-      httpClient = new Client(new URLConnectionClientHandler(urlFactory), config);
+      if (UserGroupInformation.isSecurityEnabled()) {
+        final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+        try {
+          final Token<?> delegationToken = getDelegationToken(currentUser.getUserName());
+          currentUser.addToken(delegationToken);
+        } catch (UndeclaredThrowableException e) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("exception getting httpclient token", e);
+          }
+        }
+      }
+
+      ClientConfig clientConfig = new DefaultClientConfig(JSONRootElementProvider.App.class);
+      HttpURLConnectionFactory connectionFactory = new TimelineURLConnectionFactory();
+      httpClient = new Client(new URLConnectionClientHandler(connectionFactory), clientConfig);
     }
     return httpClient;
   }
 
+  private Token<?> getDelegationToken(final String renewer) throws
+      IOException, TezException {
+    authUgi.checkTGTAndReloginFromKeytab();
+    try {
+      return authUgi.doAs(new PrivilegedExceptionAction<Token<?>>() {
+        @Override
+        public Token<?> run() throws IOException, AuthenticationException {
+          try {
+            URI resURI = URI.create(baseUri);
+            DelegationTokenAuthenticatedURL authUrl =
+                new DelegationTokenAuthenticatedURL(authenticator, connConfigurator);
+            return (Token) authUrl.getDelegationToken(resURI.toURL(), token, renewer, doAsUser);
+          } catch (IllegalArgumentException e) {
+            throw new IOException("invalid url " + baseUri, e);
+          }
+        }
+      });
+    } catch (InterruptedException e) {
+      throw new TezException(e);
+    }
+  }
+
+  private class TimelineURLConnectionFactory implements HttpURLConnectionFactory {
+
+    @Override
+    public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
+      try {
+        return new DelegationTokenAuthenticatedURL(
+            authenticator, connConfigurator).openConnection(url, token,
+            doAsUser);
+      } catch (UndeclaredThrowableException e) {
+        throw new IOException(e.getCause());
+      } catch (AuthenticationException ae) {
+        throw new IOException(ae);
+      }
+    }
+
+  }
+
+  private static ConnectionConfigurator newConnConfigurator(Configuration conf) {
+    try {
+      return newSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf);
+    } catch (Exception e) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Cannot load customized ssl related configuration. " +
+            "Fallback to system-generic settings.", e);
+      }
+      return DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
+    }
+  }
+
+  private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR =
+      new ConnectionConfigurator() {
+        @Override
+        public HttpURLConnection configure(HttpURLConnection conn)
+            throws IOException {
+          setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT);
+          return conn;
+        }
+      };
+
+  private static ConnectionConfigurator newSslConnConfigurator(final int timeout, Configuration
conf)
+      throws IOException, GeneralSecurityException {
+    final SSLFactory factory;
+    final SSLSocketFactory sf;
+    final HostnameVerifier hv;
+
+    factory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
+    factory.init();
+    sf = factory.createSSLSocketFactory();
+    hv = factory.getHostnameVerifier();
+
+    return new ConnectionConfigurator() {
+      @Override
+      public HttpURLConnection configure(HttpURLConnection conn)
+          throws IOException {
+        if (conn instanceof HttpsURLConnection) {
+          HttpsURLConnection c = (HttpsURLConnection) conn;
+          c.setSSLSocketFactory(sf);
+          c.setHostnameVerifier(hv);
+        }
+        setTimeouts(conn, timeout);
+        return conn;
+      }
+    };
+  }
+
+  private static void setTimeouts(URLConnection connection, int socketTimeout) {
+    connection.setConnectTimeout(socketTimeout);
+    connection.setReadTimeout(socketTimeout);
+  }
+
   private static final Map<String, DAGStatusStateProto> dagStateProtoMap =
       Collections.unmodifiableMap(new HashMap<String, DAGStatusStateProto>() {{
         put("NEW", DAGStatusStateProto.DAG_SUBMITTED);
@@ -495,14 +651,4 @@ public class DAGClientTimelineImpl extends DAGClient {
         put("TERMINATING", VertexStatusStateProto.VERTEX_TERMINATING);
       }});
 
-
-  class PseudoAuthenticatedURLConnectionFactory implements HttpURLConnectionFactory {
-    @Override
-    public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
-      String tokenString = (url.getQuery() == null ? "?" : "&") + "user.name=" +
-          URLEncoder.encode(UserGroupInformation.getCurrentUser().getShortUserName(), "UTF8");
-      return (HttpURLConnection) (new URL(url.toString() + tokenString)).openConnection();
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/5d95d998/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java b/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
index a72b799..aafc28f 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
@@ -53,7 +53,7 @@ public class TestATSHttpClient {
   }
 
   @Test(timeout = 5000)
-  public void testGetDagStatusThrowsExceptionOnEmptyJson() throws TezException {
+  public void testGetDagStatusThrowsExceptionOnEmptyJson() throws TezException, IOException
{
     ApplicationId mockAppId = mock(ApplicationId.class);
     DAGClientTimelineImpl httpClient = new DAGClientTimelineImpl(mockAppId, "EXAMPLE_DAG_ID",
         new TezConfiguration(), null);


Mime
View raw message