apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vro...@apache.org
Subject [1/2] apex-core git commit: APEXCORE-515 Providing principal for token refresh
Date Thu, 01 Sep 2016 19:57:01 GMT
Repository: apex-core
Updated Branches:
  refs/heads/APEXCORE-515 [created] c13b0dd41


APEXCORE-515 Providing principal for token refresh


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/dd5e95a0
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/dd5e95a0
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/dd5e95a0

Branch: refs/heads/APEXCORE-515
Commit: dd5e95a090214bdcff3232cbdcad3dc59c24eff9
Parents: d651edc
Author: Pramod Immaneni <pramod@datatorrent.com>
Authored: Wed Aug 24 17:25:25 2016 -0700
Committer: Pramod Immaneni <pramod@datatorrent.com>
Committed: Thu Sep 1 12:26:43 2016 -0700

----------------------------------------------------------------------
 .../stram/StreamingAppMasterService.java        |  3 +-
 .../stram/client/StramAppLauncher.java          | 28 +++++++++---------
 .../stram/engine/StreamingContainer.java        |  3 +-
 .../stram/plan/logical/LogicalPlan.java         |  1 +
 .../stram/security/StramUserLogin.java          | 20 +++++++++----
 .../stram/client/StramAppLauncherTest.java      | 30 ++++++++++++++------
 6 files changed, 54 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/dd5e95a0/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index 43ab743..15b6402 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -676,6 +676,7 @@ public class StreamingAppMasterService extends CompositeService
     long tokenLifeTime = (long)(dag.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR)
* Math.min(dag.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME), dag.getValue(LogicalPlan.RM_TOKEN_LIFE_TIME)));
     long expiryTime = System.currentTimeMillis() + tokenLifeTime;
     LOG.debug(" expiry token time {}", tokenLifeTime);
+    String principal = dag.getValue(LogicalPlan.PRINCIPAL);
     String hdfsKeyTabFile = dag.getValue(LogicalPlan.KEY_TAB_FILE);
 
     // Register self with ResourceManager
@@ -753,7 +754,7 @@ public class StreamingAppMasterService extends CompositeService
 
       if (UserGroupInformation.isSecurityEnabled() && currentTimeMillis >= expiryTime
&& hdfsKeyTabFile != null) {
         String applicationId = appAttemptID.getApplicationId().toString();
-        expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(),
applicationId, conf, hdfsKeyTabFile, credentials, rmAddress, true);
+        expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(),
applicationId, conf, principal, hdfsKeyTabFile, credentials, rmAddress, true);
       }
 
       if (currentTimeMillis > nodeReportUpdateTime) {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/dd5e95a0/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
index 5024c38..619252f 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
@@ -560,14 +560,12 @@ public class StramAppLauncher
     return cl;
   }
 
-  private void setTokenRefreshKeytab(LogicalPlan dag, Configuration conf) throws IOException
+  private void setTokenRefreshCredentials(LogicalPlan dag, Configuration conf) throws IOException
   {
-    String keytabPath;
-    if ((keytabPath = conf.get(StramClientUtils.KEY_TAB_FILE)) == null) {
-      String keytab;
-      if ((keytab = StramUserLogin.getKeytab()) == null) {
-        keytab = conf.get(StramUserLogin.DT_AUTH_KEYTAB);
-      }
+    String principal = StramUserLogin.getPrincipal();
+    String keytabPath = conf.get(StramClientUtils.KEY_TAB_FILE);
+    if (keytabPath == null) {
+      String keytab = StramUserLogin.getKeytab();
       if (keytab != null) {
         Path localKeyTabPath = new Path(keytab);
         try (FileSystem fs = StramClientUtils.newFileSystemInstance(conf)) {
@@ -579,10 +577,11 @@ public class StramAppLauncher
         }
       }
     }
-    if (keytabPath != null) {
+    if ((principal != null) && (keytabPath != null)) {
+      dag.setAttribute(LogicalPlan.PRINCIPAL, principal);
       dag.setAttribute(LogicalPlan.KEY_TAB_FILE, keytabPath);
     } else {
-      LOG.warn("No keytab specified for refreshing tokens, application may not be able to
run indefinitely");
+      LOG.warn("Credentials for refreshing tokens not available, application may not be able
to run indefinitely");
     }
   }
 
@@ -600,13 +599,12 @@ public class StramAppLauncher
     Configuration conf = propertiesBuilder.conf;
     conf.setEnum(StreamingApplication.ENVIRONMENT, StreamingApplication.Environment.CLUSTER);
     LogicalPlan dag = appConfig.createApp(propertiesBuilder);
-    long hdfsTokenMaxLifeTime = conf.getLong(StramClientUtils.DT_HDFS_TOKEN_MAX_LIFE_TIME,
conf.getLong(StramClientUtils.HDFS_TOKEN_MAX_LIFE_TIME, StramClientUtils.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT));
-    dag.setAttribute(LogicalPlan.HDFS_TOKEN_LIFE_TIME, hdfsTokenMaxLifeTime);
-    long rmTokenMaxLifeTime = conf.getLong(StramClientUtils.DT_RM_TOKEN_MAX_LIFE_TIME, conf.getLong(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_KEY,
YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT));
-    dag.setAttribute(LogicalPlan.RM_TOKEN_LIFE_TIME, rmTokenMaxLifeTime);
-    // TODO:- Need to see if other token refresh attributes are needed if security is not
enabled
     if (UserGroupInformation.isSecurityEnabled()) {
-      setTokenRefreshKeytab(dag, conf);
+      long hdfsTokenMaxLifeTime = conf.getLong(StramClientUtils.DT_HDFS_TOKEN_MAX_LIFE_TIME,
conf.getLong(StramClientUtils.HDFS_TOKEN_MAX_LIFE_TIME, StramClientUtils.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT));
+      dag.setAttribute(LogicalPlan.HDFS_TOKEN_LIFE_TIME, hdfsTokenMaxLifeTime);
+      long rmTokenMaxLifeTime = conf.getLong(StramClientUtils.DT_RM_TOKEN_MAX_LIFE_TIME,
conf.getLong(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_KEY, YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT));
+      dag.setAttribute(LogicalPlan.RM_TOKEN_LIFE_TIME, rmTokenMaxLifeTime);
+      setTokenRefreshCredentials(dag, conf);
     }
     String tokenRefreshFactor = conf.get(StramClientUtils.TOKEN_ANTICIPATORY_REFRESH_FACTOR);
     if (tokenRefreshFactor != null && tokenRefreshFactor.trim().length() > 0)
{

http://git-wip-us.apache.org/repos/asf/apex-core/blob/dd5e95a0/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index 27688e3..2232bbf 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -616,11 +616,12 @@ public class StreamingContainer extends YarnContainerMain
       Token<?> token = iter.next();
       logger.debug("token: {}", token);
     }
+    String principal = containerContext.getValue(LogicalPlan.PRINCIPAL);
     String hdfsKeyTabFile = containerContext.getValue(LogicalPlan.KEY_TAB_FILE);
     while (!exitHeartbeatLoop) {
 
       if (UserGroupInformation.isSecurityEnabled() && System.currentTimeMillis()
>= expiryTime && hdfsKeyTabFile != null) {
-        expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(),
containerId, conf, hdfsKeyTabFile, credentials, null, false);
+        expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(),
containerId, conf, principal, hdfsKeyTabFile, credentials, null, false);
       }
       synchronized (this.heartbeatTrigger) {
         try {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/dd5e95a0/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index b301f9e..580d1bc 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -157,6 +157,7 @@ public class LogicalPlan implements Serializable, DAG
   public static Attribute<Boolean> FAST_PUBLISHER_SUBSCRIBER = new Attribute<>(false);
   public static Attribute<Long> HDFS_TOKEN_LIFE_TIME = new Attribute<>(604800000L);
   public static Attribute<Long> RM_TOKEN_LIFE_TIME = new Attribute<>(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
+  public static Attribute<String> PRINCIPAL = new Attribute<String>(null, new
StringCodec.String2String());
   public static Attribute<String> KEY_TAB_FILE = new Attribute<>((String)null,
new StringCodec.String2String());
   public static Attribute<Double> TOKEN_REFRESH_ANTICIPATORY_FACTOR = new Attribute<>(0.7);
   /**

http://git-wip-us.apache.org/repos/asf/apex-core/blob/dd5e95a0/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java b/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
index b2fa4e7..0c1d0c9 100644
--- a/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
+++ b/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java
@@ -49,7 +49,7 @@ public class StramUserLogin
 {
   private static final Logger LOG = LoggerFactory.getLogger(StramUserLogin.class);
   public static final String DT_AUTH_PREFIX = StreamingApplication.DT_PREFIX + "authentication.";
-  private static final String DT_AUTH_PRINCIPAL = DT_AUTH_PREFIX + "principal";
+  public static final String DT_AUTH_PRINCIPAL = DT_AUTH_PREFIX + "principal";
   public static final String DT_AUTH_KEYTAB = DT_AUTH_PREFIX + "keytab";
   private static String principal;
   private static String keytab;
@@ -57,12 +57,17 @@ public class StramUserLogin
   public static void attemptAuthentication(Configuration conf) throws IOException
   {
     if (UserGroupInformation.isSecurityEnabled()) {
-      String userPrincipal = conf.get(DT_AUTH_PRINCIPAL);
-      String userKeytab = conf.get(DT_AUTH_KEYTAB);
-      authenticate(userPrincipal, userKeytab);
+      authenticate(conf);
     }
   }
 
+  public static void authenticate(Configuration conf) throws IOException
+  {
+    String userPrincipal = conf.get(DT_AUTH_PRINCIPAL);
+    String userKeytab = conf.get(DT_AUTH_KEYTAB);
+    authenticate(userPrincipal, userKeytab);
+  }
+
   public static void authenticate(String principal, String keytab) throws IOException
   {
     if ((principal != null) && !principal.isEmpty()
@@ -79,7 +84,7 @@ public class StramUserLogin
     }
   }
 
-  public static long refreshTokens(long tokenLifeTime, String destinationDir, String destinationFile,
final Configuration conf, String hdfsKeyTabFile, final Credentials credentials, final InetSocketAddress
rmAddress, final boolean renewRMToken) throws IOException
+  public static long refreshTokens(long tokenLifeTime, String destinationDir, String destinationFile,
final Configuration conf, String principal, String hdfsKeyTabFile, final Credentials credentials,
final InetSocketAddress rmAddress, final boolean renewRMToken) throws IOException
   {
     long expiryTime = System.currentTimeMillis() + tokenLifeTime;
     //renew tokens
@@ -93,7 +98,10 @@ public class StramUserLogin
       keyTabFile = FSUtil.copyToLocalFileSystem(fs, destinationDir, destinationFile, hdfsKeyTabFile,
conf);
     }
 
-    UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(UserGroupInformation.getCurrentUser().getUserName(),
keyTabFile.getAbsolutePath());
+    if (principal == null) {
+      principal = UserGroupInformation.getCurrentUser().getUserName();
+    }
+    UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal,
keyTabFile.getAbsolutePath());
     try {
       ugi.doAs(new PrivilegedExceptionAction<Object>()
       {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/dd5e95a0/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java b/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java
index b1856e1..ad1dbd6 100644
--- a/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java
@@ -42,9 +42,7 @@ import com.datatorrent.stram.plan.logical.LogicalPlan;
 import com.datatorrent.stram.security.StramUserLogin;
 
 import static org.powermock.api.mockito.PowerMockito.method;
-import static org.powermock.api.mockito.PowerMockito.spy;
 import static org.powermock.api.mockito.PowerMockito.suppress;
-import static org.powermock.api.mockito.PowerMockito.when;
 
 /**
  * StramAppLauncher Test
@@ -52,12 +50,18 @@ import static org.powermock.api.mockito.PowerMockito.when;
 @RunWith(Enclosed.class)
 public class StramAppLauncherTest
 {
-  @PrepareForTest({StramAppLauncher.class, StramUserLogin.class})
+
+  private static final String SET_TOKEN_REFRESH_CREDENTIALS_METHOD = "setTokenRefreshCredentials";
+
+  @PrepareForTest({StramAppLauncher.class})
   @PowerMockIgnore({"javax.xml.*", "org.w3c.*", "org.apache.hadoop.*", "org.apache.log4j.*"})
   public static class RefreshTokenTests
   {
     File workspace;
     File sourceKeytab;
+    File dfsDir;
+
+    static final String principal = "username/group@domain";
 
     @Rule
     public PowerMockRule rule = new PowerMockRule();
@@ -77,6 +81,7 @@ public class StramAppLauncherTest
         } catch (IOException e) {
           throw new RuntimeException(e);
         }
+        dfsDir = new File(workspace, "dst");
         suppress(method(StramAppLauncher.class, "init"));
       }
 
@@ -92,17 +97,24 @@ public class StramAppLauncherTest
     public void testGetTokenRefreshKeytab() throws Exception
     {
       Configuration conf = new Configuration(false);
-      conf.set(StramClientUtils.KEY_TAB_FILE, sourceKeytab.getPath());
+      File storeKeytab = new File(dfsDir, "keytab2");
+      conf.set(StramClientUtils.KEY_TAB_FILE, storeKeytab.getPath());
+      StramUserLogin.authenticate(principal, sourceKeytab.getPath());
       LogicalPlan dag = applyTokenRefreshKeytab(FileSystem.newInstance(conf), conf);
-      Assert.assertEquals("Token refresh keytab path", sourceKeytab.getPath(), dag.getValue(LogicalPlan.KEY_TAB_FILE));
+      Assert.assertEquals("Token refresh principal", principal, dag.getValue(LogicalPlan.PRINCIPAL));
+      Assert.assertEquals("Token refresh keytab path", storeKeytab.getPath(), dag.getValue(LogicalPlan.KEY_TAB_FILE));
     }
 
     @Test
     public void testUserLoginTokenRefreshKeytab() throws Exception
     {
       Configuration conf = new Configuration(false);
+      /*
       spy(StramUserLogin.class);
+      when(StramUserLogin.getPrincipal()).thenReturn(principal);
       when(StramUserLogin.getKeytab()).thenReturn(sourceKeytab.getPath());
+      */
+      StramUserLogin.authenticate(principal, sourceKeytab.getPath());
       testDFSTokenPath(conf);
     }
 
@@ -110,25 +122,27 @@ public class StramAppLauncherTest
     public void testAuthPropTokenRefreshKeytab() throws Exception
     {
       Configuration conf = new Configuration(false);
+      conf.set(StramUserLogin.DT_AUTH_PRINCIPAL, principal);
       conf.set(StramUserLogin.DT_AUTH_KEYTAB, sourceKeytab.getPath());
+      StramUserLogin.authenticate(conf);
       testDFSTokenPath(conf);
     }
 
     private void testDFSTokenPath(Configuration conf) throws Exception
     {
       FileSystem fs = FileSystem.newInstance(conf);
-      File dfsDir = new File(workspace, "dst");
       conf.set(StramClientUtils.DT_DFS_ROOT_DIR, dfsDir.getAbsolutePath());
       LogicalPlan dag = applyTokenRefreshKeytab(fs, conf);
+      Assert.assertEquals("Token refresh principal", principal, dag.getValue(LogicalPlan.PRINCIPAL));
       Assert.assertEquals("Token refresh keytab path", new Path(fs.getUri().getScheme(),
fs.getUri().getAuthority(),
-          new File(dfsDir, "keytab").getAbsolutePath()).toString(), dag.getValue(LogicalPlan.KEY_TAB_FILE));
+          new File(dfsDir, sourceKeytab.getName()).getAbsolutePath()).toString(), dag.getValue(LogicalPlan.KEY_TAB_FILE));
     }
 
     private LogicalPlan applyTokenRefreshKeytab(FileSystem fs, Configuration conf) throws
Exception
     {
       LogicalPlan dag = new LogicalPlan();
       StramAppLauncher appLauncher = new StramAppLauncher(fs, conf);
-      Whitebox.invokeMethod(appLauncher, "setTokenRefreshKeytab", dag, conf);
+      Whitebox.invokeMethod(appLauncher, SET_TOKEN_REFRESH_CREDENTIALS_METHOD, dag, conf);
       return dag;
     }
   }


Mime
View raw message