Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2C55A200B74 for ; Thu, 1 Sep 2016 21:57:03 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2AE9D160AB5; Thu, 1 Sep 2016 19:57:03 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F3810160AA8 for ; Thu, 1 Sep 2016 21:57:01 +0200 (CEST) Received: (qmail 18401 invoked by uid 500); 1 Sep 2016 19:57:01 -0000 Mailing-List: contact commits-help@apex.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.apache.org Delivered-To: mailing list commits@apex.apache.org Received: (qmail 18392 invoked by uid 99); 1 Sep 2016 19:57:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Sep 2016 19:57:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1685CE00C4; Thu, 1 Sep 2016 19:57:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vrozov@apache.org To: commits@apex.apache.org Date: Thu, 01 Sep 2016 19:57:01 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] apex-core git commit: APEXCORE-515 Providing principal for token refresh archived-at: Thu, 01 Sep 2016 19:57:03 -0000 Repository: apex-core Updated Branches: refs/heads/APEXCORE-515 [created] c13b0dd41 APEXCORE-515 Providing principal for token refresh Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/dd5e95a0 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/dd5e95a0 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/dd5e95a0 Branch: refs/heads/APEXCORE-515 Commit: dd5e95a090214bdcff3232cbdcad3dc59c24eff9 Parents: d651edc Author: Pramod Immaneni Authored: Wed Aug 24 17:25:25 2016 -0700 Committer: Pramod Immaneni Committed: Thu Sep 1 12:26:43 2016 -0700 ---------------------------------------------------------------------- .../stram/StreamingAppMasterService.java | 3 +- .../stram/client/StramAppLauncher.java | 28 +++++++++--------- .../stram/engine/StreamingContainer.java | 3 +- .../stram/plan/logical/LogicalPlan.java | 1 + .../stram/security/StramUserLogin.java | 20 +++++++++---- .../stram/client/StramAppLauncherTest.java | 30 ++++++++++++++------ 6 files changed, 54 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/dd5e95a0/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index 43ab743..15b6402 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -676,6 +676,7 @@ public class StreamingAppMasterService extends CompositeService long tokenLifeTime = (long)(dag.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * Math.min(dag.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME), dag.getValue(LogicalPlan.RM_TOKEN_LIFE_TIME))); long expiryTime = System.currentTimeMillis() + tokenLifeTime; LOG.debug(" expiry token time {}", tokenLifeTime); + String principal = dag.getValue(LogicalPlan.PRINCIPAL); String hdfsKeyTabFile = dag.getValue(LogicalPlan.KEY_TAB_FILE); // Register self with ResourceManager @@ -753,7 +754,7 @@ public class StreamingAppMasterService extends CompositeService if (UserGroupInformation.isSecurityEnabled() && currentTimeMillis >= expiryTime && hdfsKeyTabFile != null) { String applicationId = appAttemptID.getApplicationId().toString(); - expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), applicationId, conf, hdfsKeyTabFile, credentials, rmAddress, true); + expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), applicationId, conf, principal, hdfsKeyTabFile, credentials, rmAddress, true); } if (currentTimeMillis > nodeReportUpdateTime) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/dd5e95a0/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java index 5024c38..619252f 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java +++ b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java @@ -560,14 +560,12 @@ public class StramAppLauncher return cl; } - private void setTokenRefreshKeytab(LogicalPlan dag, Configuration conf) throws IOException + private void setTokenRefreshCredentials(LogicalPlan dag, Configuration conf) throws IOException { - String keytabPath; - if ((keytabPath = conf.get(StramClientUtils.KEY_TAB_FILE)) == null) { - String keytab; - if ((keytab = StramUserLogin.getKeytab()) == null) { - keytab = conf.get(StramUserLogin.DT_AUTH_KEYTAB); - } + String principal = StramUserLogin.getPrincipal(); + String keytabPath = conf.get(StramClientUtils.KEY_TAB_FILE); + if (keytabPath == null) { + String keytab = StramUserLogin.getKeytab(); if (keytab != null) { Path localKeyTabPath = new Path(keytab); try (FileSystem fs = StramClientUtils.newFileSystemInstance(conf)) { @@ -579,10 +577,11 @@ public class StramAppLauncher } } } - if (keytabPath != null) { + if ((principal != null) && (keytabPath != null)) { + dag.setAttribute(LogicalPlan.PRINCIPAL, principal); dag.setAttribute(LogicalPlan.KEY_TAB_FILE, keytabPath); } else { - LOG.warn("No keytab specified for refreshing tokens, application may not be able to run indefinitely"); + LOG.warn("Credentials for refreshing tokens not available, application may not be able to run indefinitely"); } } @@ -600,13 +599,12 @@ public class StramAppLauncher Configuration conf = propertiesBuilder.conf; conf.setEnum(StreamingApplication.ENVIRONMENT, StreamingApplication.Environment.CLUSTER); LogicalPlan dag = appConfig.createApp(propertiesBuilder); - long hdfsTokenMaxLifeTime = conf.getLong(StramClientUtils.DT_HDFS_TOKEN_MAX_LIFE_TIME, conf.getLong(StramClientUtils.HDFS_TOKEN_MAX_LIFE_TIME, StramClientUtils.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT)); - dag.setAttribute(LogicalPlan.HDFS_TOKEN_LIFE_TIME, hdfsTokenMaxLifeTime); - long rmTokenMaxLifeTime = conf.getLong(StramClientUtils.DT_RM_TOKEN_MAX_LIFE_TIME, conf.getLong(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_KEY, YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT)); - dag.setAttribute(LogicalPlan.RM_TOKEN_LIFE_TIME, rmTokenMaxLifeTime); - // TODO:- Need to see if other token refresh attributes are needed if security is not enabled if (UserGroupInformation.isSecurityEnabled()) { - setTokenRefreshKeytab(dag, conf); + long hdfsTokenMaxLifeTime = conf.getLong(StramClientUtils.DT_HDFS_TOKEN_MAX_LIFE_TIME, conf.getLong(StramClientUtils.HDFS_TOKEN_MAX_LIFE_TIME, StramClientUtils.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT)); + dag.setAttribute(LogicalPlan.HDFS_TOKEN_LIFE_TIME, hdfsTokenMaxLifeTime); + long rmTokenMaxLifeTime = conf.getLong(StramClientUtils.DT_RM_TOKEN_MAX_LIFE_TIME, conf.getLong(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_KEY, YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT)); + dag.setAttribute(LogicalPlan.RM_TOKEN_LIFE_TIME, rmTokenMaxLifeTime); + setTokenRefreshCredentials(dag, conf); } String tokenRefreshFactor = conf.get(StramClientUtils.TOKEN_ANTICIPATORY_REFRESH_FACTOR); if (tokenRefreshFactor != null && tokenRefreshFactor.trim().length() > 0) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/dd5e95a0/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java index 27688e3..2232bbf 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java @@ -616,11 +616,12 @@ public class StreamingContainer extends YarnContainerMain Token token = iter.next(); logger.debug("token: {}", token); } + String principal = containerContext.getValue(LogicalPlan.PRINCIPAL); String hdfsKeyTabFile = containerContext.getValue(LogicalPlan.KEY_TAB_FILE); while (!exitHeartbeatLoop) { if (UserGroupInformation.isSecurityEnabled() && System.currentTimeMillis() >= expiryTime && hdfsKeyTabFile != null) { - expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), containerId, conf, hdfsKeyTabFile, credentials, null, false); + expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), containerId, conf, principal, hdfsKeyTabFile, credentials, null, false); } synchronized (this.heartbeatTrigger) { try { http://git-wip-us.apache.org/repos/asf/apex-core/blob/dd5e95a0/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index b301f9e..580d1bc 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -157,6 +157,7 @@ public class LogicalPlan implements Serializable, DAG public static Attribute FAST_PUBLISHER_SUBSCRIBER = new Attribute<>(false); public static Attribute HDFS_TOKEN_LIFE_TIME = new Attribute<>(604800000L); public static Attribute RM_TOKEN_LIFE_TIME = new Attribute<>(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT); + public static Attribute PRINCIPAL = new Attribute(null, new StringCodec.String2String()); public static Attribute KEY_TAB_FILE = new Attribute<>((String)null, new StringCodec.String2String()); public static Attribute TOKEN_REFRESH_ANTICIPATORY_FACTOR = new Attribute<>(0.7); /** http://git-wip-us.apache.org/repos/asf/apex-core/blob/dd5e95a0/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java b/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java index b2fa4e7..0c1d0c9 100644 --- a/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java +++ b/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java @@ -49,7 +49,7 @@ public class StramUserLogin { private static final Logger LOG = LoggerFactory.getLogger(StramUserLogin.class); public static final String DT_AUTH_PREFIX = StreamingApplication.DT_PREFIX + "authentication."; - private static final String DT_AUTH_PRINCIPAL = DT_AUTH_PREFIX + "principal"; + public static final String DT_AUTH_PRINCIPAL = DT_AUTH_PREFIX + "principal"; public static final String DT_AUTH_KEYTAB = DT_AUTH_PREFIX + "keytab"; private static String principal; private static String keytab; @@ -57,12 +57,17 @@ public class StramUserLogin public static void attemptAuthentication(Configuration conf) throws IOException { if (UserGroupInformation.isSecurityEnabled()) { - String userPrincipal = conf.get(DT_AUTH_PRINCIPAL); - String userKeytab = conf.get(DT_AUTH_KEYTAB); - authenticate(userPrincipal, userKeytab); + authenticate(conf); } } + public static void authenticate(Configuration conf) throws IOException + { + String userPrincipal = conf.get(DT_AUTH_PRINCIPAL); + String userKeytab = conf.get(DT_AUTH_KEYTAB); + authenticate(userPrincipal, userKeytab); + } + public static void authenticate(String principal, String keytab) throws IOException { if ((principal != null) && !principal.isEmpty() @@ -79,7 +84,7 @@ public class StramUserLogin } } - public static long refreshTokens(long tokenLifeTime, String destinationDir, String destinationFile, final Configuration conf, String hdfsKeyTabFile, final Credentials credentials, final InetSocketAddress rmAddress, final boolean renewRMToken) throws IOException + public static long refreshTokens(long tokenLifeTime, String destinationDir, String destinationFile, final Configuration conf, String principal, String hdfsKeyTabFile, final Credentials credentials, final InetSocketAddress rmAddress, final boolean renewRMToken) throws IOException { long expiryTime = System.currentTimeMillis() + tokenLifeTime; //renew tokens @@ -93,7 +98,10 @@ public class StramUserLogin keyTabFile = FSUtil.copyToLocalFileSystem(fs, destinationDir, destinationFile, hdfsKeyTabFile, conf); } - UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(UserGroupInformation.getCurrentUser().getUserName(), keyTabFile.getAbsolutePath()); + if (principal == null) { + principal = UserGroupInformation.getCurrentUser().getUserName(); + } + UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTabFile.getAbsolutePath()); try { ugi.doAs(new PrivilegedExceptionAction() { http://git-wip-us.apache.org/repos/asf/apex-core/blob/dd5e95a0/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java b/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java index b1856e1..ad1dbd6 100644 --- a/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java +++ b/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java @@ -42,9 +42,7 @@ import com.datatorrent.stram.plan.logical.LogicalPlan; import com.datatorrent.stram.security.StramUserLogin; import static org.powermock.api.mockito.PowerMockito.method; -import static org.powermock.api.mockito.PowerMockito.spy; import static org.powermock.api.mockito.PowerMockito.suppress; -import static org.powermock.api.mockito.PowerMockito.when; /** * StramAppLauncher Test @@ -52,12 +50,18 @@ import static org.powermock.api.mockito.PowerMockito.when; @RunWith(Enclosed.class) public class StramAppLauncherTest { - @PrepareForTest({StramAppLauncher.class, StramUserLogin.class}) + + private static final String SET_TOKEN_REFRESH_CREDENTIALS_METHOD = "setTokenRefreshCredentials"; + + @PrepareForTest({StramAppLauncher.class}) @PowerMockIgnore({"javax.xml.*", "org.w3c.*", "org.apache.hadoop.*", "org.apache.log4j.*"}) public static class RefreshTokenTests { File workspace; File sourceKeytab; + File dfsDir; + + static final String principal = "username/group@domain"; @Rule public PowerMockRule rule = new PowerMockRule(); @@ -77,6 +81,7 @@ public class StramAppLauncherTest } catch (IOException e) { throw new RuntimeException(e); } + dfsDir = new File(workspace, "dst"); suppress(method(StramAppLauncher.class, "init")); } @@ -92,17 +97,24 @@ public class StramAppLauncherTest public void testGetTokenRefreshKeytab() throws Exception { Configuration conf = new Configuration(false); - conf.set(StramClientUtils.KEY_TAB_FILE, sourceKeytab.getPath()); + File storeKeytab = new File(dfsDir, "keytab2"); + conf.set(StramClientUtils.KEY_TAB_FILE, storeKeytab.getPath()); + StramUserLogin.authenticate(principal, sourceKeytab.getPath()); LogicalPlan dag = applyTokenRefreshKeytab(FileSystem.newInstance(conf), conf); - Assert.assertEquals("Token refresh keytab path", sourceKeytab.getPath(), dag.getValue(LogicalPlan.KEY_TAB_FILE)); + Assert.assertEquals("Token refresh principal", principal, dag.getValue(LogicalPlan.PRINCIPAL)); + Assert.assertEquals("Token refresh keytab path", storeKeytab.getPath(), dag.getValue(LogicalPlan.KEY_TAB_FILE)); } @Test public void testUserLoginTokenRefreshKeytab() throws Exception { Configuration conf = new Configuration(false); + /* spy(StramUserLogin.class); + when(StramUserLogin.getPrincipal()).thenReturn(principal); when(StramUserLogin.getKeytab()).thenReturn(sourceKeytab.getPath()); + */ + StramUserLogin.authenticate(principal, sourceKeytab.getPath()); testDFSTokenPath(conf); } @@ -110,25 +122,27 @@ public class StramAppLauncherTest public void testAuthPropTokenRefreshKeytab() throws Exception { Configuration conf = new Configuration(false); + conf.set(StramUserLogin.DT_AUTH_PRINCIPAL, principal); conf.set(StramUserLogin.DT_AUTH_KEYTAB, sourceKeytab.getPath()); + StramUserLogin.authenticate(conf); testDFSTokenPath(conf); } private void testDFSTokenPath(Configuration conf) throws Exception { FileSystem fs = FileSystem.newInstance(conf); - File dfsDir = new File(workspace, "dst"); conf.set(StramClientUtils.DT_DFS_ROOT_DIR, dfsDir.getAbsolutePath()); LogicalPlan dag = applyTokenRefreshKeytab(fs, conf); + Assert.assertEquals("Token refresh principal", principal, dag.getValue(LogicalPlan.PRINCIPAL)); Assert.assertEquals("Token refresh keytab path", new Path(fs.getUri().getScheme(), fs.getUri().getAuthority(), - new File(dfsDir, "keytab").getAbsolutePath()).toString(), dag.getValue(LogicalPlan.KEY_TAB_FILE)); + new File(dfsDir, sourceKeytab.getName()).getAbsolutePath()).toString(), dag.getValue(LogicalPlan.KEY_TAB_FILE)); } private LogicalPlan applyTokenRefreshKeytab(FileSystem fs, Configuration conf) throws Exception { LogicalPlan dag = new LogicalPlan(); StramAppLauncher appLauncher = new StramAppLauncher(fs, conf); - Whitebox.invokeMethod(appLauncher, "setTokenRefreshKeytab", dag, conf); + Whitebox.invokeMethod(appLauncher, SET_TOKEN_REFRESH_CREDENTIALS_METHOD, dag, conf); return dag; } }