Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 44EF318143 for ; Thu, 14 May 2015 00:30:37 +0000 (UTC) Received: (qmail 24043 invoked by uid 500); 14 May 2015 00:30:37 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 24006 invoked by uid 500); 14 May 2015 00:30:37 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 23939 invoked by uid 99); 14 May 2015 00:30:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 May 2015 00:30:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0A04EE0779; Thu, 14 May 2015 00:30:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zjffdu@apache.org To: commits@tez.apache.org Date: Thu, 14 May 2015 00:30:38 -0000 Message-Id: <329d0ed0bf8d4158b2554d71d5798ba7@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] tez git commit: Revert "TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster. (Prakash Ramachandran via hitesh)" Revert "TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster. (Prakash Ramachandran via hitesh)" This reverts commit 5d95d998c38a965adaa0c7810ecb57520ef9f110. Conflicts: CHANGES.txt Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d72cd949 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d72cd949 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d72cd949 Branch: refs/heads/branch-0.6 Commit: d72cd949d52febc741f102d59413d6abe889df2b Parents: ba9e3f9 Author: Jeff Zhang Authored: Thu May 14 08:30:25 2015 +0800 Committer: Jeff Zhang Committed: Thu May 14 08:30:25 2015 +0800 ---------------------------------------------------------------------- CHANGES.txt | 2 - .../tez/dag/api/client/DAGClientImpl.java | 5 + .../dag/api/client/DAGClientTimelineImpl.java | 188 +++---------------- .../tez/dag/api/client/TestATSHttpClient.java | 2 +- 4 files changed, 27 insertions(+), 170 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/d72cd949/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c2f4ae4..f16d278 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -8,7 +8,6 @@ INCOMPATIBLE CHANGES ALL CHANGES: TEZ-2057. tez-dag/pom.xml contains versions for dependencies. TEZ-2282. Delimit reused yarn container logs (stderr, stdout, syslog) with task attempt start/stop events - TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster. TEZ-2396. pig-tez-tfile-parser pom is hard coded to depend on 0.6.0-SNAPSHOT version. TEZ-2237. Valid events should be sent out when an Output is not started. TEZ-2399. Tez UI: add proper dependencies for computed properties @@ -192,7 +191,6 @@ TEZ-UI CHANGES (TEZ-8): Release 0.5.4: Unreleased ALL CHANGES: - TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster. TEZ-2369. Add a few unit tests for RootInputInitializerManager. Backport a findbugs warning fix from master. TEZ-2379. org.apache.hadoop.yarn.state.InvalidStateTransitonException: Invalid event: T_ATTEMPT_KILLED at KILLED. http://git-wip-us.apache.org/repos/asf/tez/blob/d72cd949/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java index 8986c18..2f2dbbb 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java @@ -90,6 +90,11 @@ public class DAGClientImpl extends DAGClient { conf.getBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED, TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT); + if (UserGroupInformation.isSecurityEnabled()){ + //TODO: enable ATS integration in kerberos secured cluster - see TEZ-1529 + isATSEnabled = false; + } + realClient = new DAGClientRPCImpl(appId, dagId, conf, this.frameworkClient); } http://git-wip-us.apache.org/repos/asf/tez/blob/d72cd949/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java index 248212e..6cbab80 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java @@ -19,20 +19,13 @@ package org.apache.tez.dag.api.client; import javax.annotation.Nullable; -import javax.net.ssl.HostnameVerifier; -import javax.net.ssl.HttpsURLConnection; -import javax.net.ssl.SSLSocketFactory; import javax.ws.rs.core.MediaType; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.lang.reflect.UndeclaredThrowableException; import java.net.HttpURLConnection; -import java.net.URI; import java.net.URL; -import java.net.URLConnection; -import java.security.GeneralSecurityException; -import java.security.PrivilegedExceptionAction; +import java.net.URLEncoder; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -57,14 +50,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.authentication.client.AuthenticationException; -import org.apache.hadoop.security.authentication.client.ConnectionConfigurator; -import org.apache.hadoop.security.ssl.SSLFactory; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL; -import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator; -import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator; -import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -96,13 +81,8 @@ public class DAGClientTimelineImpl extends DAGClient { private static Client httpClient = null; private final ApplicationId appId; private final String dagId; + private final TezConfiguration conf; private final FrameworkClient frameworkClient; - private final UserGroupInformation authUgi; - private final String doAsUser; - private final DelegationTokenAuthenticator authenticator; - private final DelegationTokenAuthenticatedURL.Token token; - private final ConnectionConfigurator connConfigurator; - private final static int DEFAULT_SOCKET_TIMEOUT = 30 * 1000; // 30 seconds private Map vertexTaskStatsCache = null; @@ -111,9 +91,10 @@ public class DAGClientTimelineImpl extends DAGClient { public DAGClientTimelineImpl(ApplicationId appId, String dagId, TezConfiguration conf, FrameworkClient frameworkClient) - throws TezException, IOException { + throws TezException { this.appId = appId; this.dagId = dagId; + this.conf = conf; this.frameworkClient = frameworkClient; String scheme; @@ -130,29 +111,9 @@ public class DAGClientTimelineImpl extends DAGClient { } baseUri = Joiner.on("").join(scheme, webAppAddress, ATSConstants.RESOURCE_URI_BASE); - - UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - UserGroupInformation realUgi = ugi.getRealUser(); - if (realUgi != null) { - authUgi = realUgi; - doAsUser = ugi.getShortUserName(); - } else { - authUgi = ugi; - doAsUser = null; - } - - - if (UserGroupInformation.isSecurityEnabled()) { - authenticator = new KerberosDelegationTokenAuthenticator(); - } else { - authenticator = new PseudoDelegationTokenAuthenticator(); - } - - connConfigurator = newConnConfigurator(conf); - authenticator.setConnectionConfigurator(connConfigurator); - token = new DelegationTokenAuthenticatedURL.Token(); } + @Override public String getExecutionContext() { return "Executing on YARN cluster with App id " + appId; @@ -164,13 +125,7 @@ public class DAGClientTimelineImpl extends DAGClient { try { appReport = frameworkClient.getApplicationReport(appId); } catch (YarnException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("error getting application report", e); - } } catch (IOException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("error getting application report", e); - } } return appReport; } @@ -457,13 +412,8 @@ public class DAGClientTimelineImpl extends DAGClient { .type(MediaType.APPLICATION_JSON_TYPE) .get(ClientResponse.class); - final ClientResponse.Status clientResponseStatus = response.getClientResponseStatus(); - if (clientResponseStatus != ClientResponse.Status.OK) { - if (clientResponseStatus == ClientResponse.Status.UNAUTHORIZED) { - httpClient = null; - } - throw new TezException("Failed to get response from YARN Timeline: url: " + url + - " error: " + clientResponseStatus); + if (response.getClientResponseStatus() != ClientResponse.Status.OK) { + throw new TezException("Failed to get response from YARN Timeline: url: " + url); } return response.getEntity(JSONObject.class); @@ -473,8 +423,6 @@ public class DAGClientTimelineImpl extends DAGClient { throw new TezException("Error accessing content from YARN Timeline - unexpected response", e); } catch (IllegalArgumentException e) { throw new TezException("Error accessing content from YARN Timeline - invalid url", e); - } catch (IOException e) { - throw new TezException("Error getting http client connection", e); } } @@ -512,119 +460,15 @@ public class DAGClientTimelineImpl extends DAGClient { } } - protected Client getHttpClient() throws IOException, TezException { + protected Client getHttpClient() { if (httpClient == null) { - if (UserGroupInformation.isSecurityEnabled()) { - final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); - try { - final Token delegationToken = getDelegationToken(currentUser.getUserName()); - currentUser.addToken(delegationToken); - } catch (UndeclaredThrowableException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("exception getting httpclient token", e); - } - } - } - - ClientConfig clientConfig = new DefaultClientConfig(JSONRootElementProvider.App.class); - HttpURLConnectionFactory connectionFactory = new TimelineURLConnectionFactory(); - httpClient = new Client(new URLConnectionClientHandler(connectionFactory), clientConfig); + ClientConfig config = new DefaultClientConfig(JSONRootElementProvider.App.class); + HttpURLConnectionFactory urlFactory = new PseudoAuthenticatedURLConnectionFactory(); + httpClient = new Client(new URLConnectionClientHandler(urlFactory), config); } return httpClient; } - private Token getDelegationToken(final String renewer) throws - IOException, TezException { - authUgi.checkTGTAndReloginFromKeytab(); - try { - return authUgi.doAs(new PrivilegedExceptionAction>() { - @Override - public Token run() throws IOException, AuthenticationException { - try { - URI resURI = URI.create(baseUri); - DelegationTokenAuthenticatedURL authUrl = - new DelegationTokenAuthenticatedURL(authenticator, connConfigurator); - return (Token) authUrl.getDelegationToken(resURI.toURL(), token, renewer, doAsUser); - } catch (IllegalArgumentException e) { - throw new IOException("invalid url " + baseUri, e); - } - } - }); - } catch (InterruptedException e) { - throw new TezException(e); - } - } - - private class TimelineURLConnectionFactory implements HttpURLConnectionFactory { - - @Override - public HttpURLConnection getHttpURLConnection(final URL url) throws IOException { - try { - return new DelegationTokenAuthenticatedURL( - authenticator, connConfigurator).openConnection(url, token, - doAsUser); - } catch (UndeclaredThrowableException e) { - throw new IOException(e.getCause()); - } catch (AuthenticationException ae) { - throw new IOException(ae); - } - } - - } - - private static ConnectionConfigurator newConnConfigurator(Configuration conf) { - try { - return newSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf); - } catch (Exception e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Cannot load customized ssl related configuration. " + - "Fallback to system-generic settings.", e); - } - return DEFAULT_TIMEOUT_CONN_CONFIGURATOR; - } - } - - private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR = - new ConnectionConfigurator() { - @Override - public HttpURLConnection configure(HttpURLConnection conn) - throws IOException { - setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT); - return conn; - } - }; - - private static ConnectionConfigurator newSslConnConfigurator(final int timeout, Configuration conf) - throws IOException, GeneralSecurityException { - final SSLFactory factory; - final SSLSocketFactory sf; - final HostnameVerifier hv; - - factory = new SSLFactory(SSLFactory.Mode.CLIENT, conf); - factory.init(); - sf = factory.createSSLSocketFactory(); - hv = factory.getHostnameVerifier(); - - return new ConnectionConfigurator() { - @Override - public HttpURLConnection configure(HttpURLConnection conn) - throws IOException { - if (conn instanceof HttpsURLConnection) { - HttpsURLConnection c = (HttpsURLConnection) conn; - c.setSSLSocketFactory(sf); - c.setHostnameVerifier(hv); - } - setTimeouts(conn, timeout); - return conn; - } - }; - } - - private static void setTimeouts(URLConnection connection, int socketTimeout) { - connection.setConnectTimeout(socketTimeout); - connection.setReadTimeout(socketTimeout); - } - private static final Map dagStateProtoMap = Collections.unmodifiableMap(new HashMap() {{ put("NEW", DAGStatusStateProto.DAG_SUBMITTED); @@ -651,4 +495,14 @@ public class DAGClientTimelineImpl extends DAGClient { put("TERMINATING", VertexStatusStateProto.VERTEX_TERMINATING); }}); + + class PseudoAuthenticatedURLConnectionFactory implements HttpURLConnectionFactory { + @Override + public HttpURLConnection getHttpURLConnection(URL url) throws IOException { + String tokenString = (url.getQuery() == null ? "?" : "&") + "user.name=" + + URLEncoder.encode(UserGroupInformation.getCurrentUser().getShortUserName(), "UTF8"); + return (HttpURLConnection) (new URL(url.toString() + tokenString)).openConnection(); + } + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/d72cd949/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java b/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java index aafc28f..a72b799 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java @@ -53,7 +53,7 @@ public class TestATSHttpClient { } @Test(timeout = 5000) - public void testGetDagStatusThrowsExceptionOnEmptyJson() throws TezException, IOException { + public void testGetDagStatusThrowsExceptionOnEmptyJson() throws TezException { ApplicationId mockAppId = mock(ApplicationId.class); DAGClientTimelineImpl httpClient = new DAGClientTimelineImpl(mockAppId, "EXAMPLE_DAG_ID", new TezConfiguration(), null);