accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject accumulo git commit: ACCUMULO-3707 Get sequential.xml running mapreduce successfully.
Date Thu, 02 Apr 2015 03:56:47 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master 91c297fa3 -> fbbe8c321


ACCUMULO-3707 Get sequential.xml running mapreduce successfully.

Need to watch for when we have a keytab, and use that to log
in instead of a password[token]. Also added the missing convenience
check in AccumuloOutputFormat to automatically fetch a DelegationToken
when a KerberosToken is passed in.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/fbbe8c32
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/fbbe8c32
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/fbbe8c32

Branch: refs/heads/master
Commit: fbbe8c3211bef7c992035749535fb77812af8177
Parents: 91c297f
Author: Josh Elser <elserj@apache.org>
Authored: Wed Apr 1 19:16:40 2015 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Wed Apr 1 19:47:05 2015 -0400

----------------------------------------------------------------------
 .../client/mapred/AccumuloOutputFormat.java     | 28 ++++++++++
 .../client/mapreduce/AccumuloOutputFormat.java  | 28 ++++++++++
 .../randomwalk/sequential/MapRedVerify.java     |  3 ++
 .../randomwalk/sequential/MapRedVerifyTool.java | 56 +++++++++++++++++---
 4 files changed, 109 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/fbbe8c32/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
index 4e95a4a..8631a2c 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
@@ -34,15 +34,19 @@ import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
 import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase;
 import org.apache.accumulo.core.client.mapreduce.lib.impl.OutputConfigurator;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.SecurityErrorCode;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.DelegationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer;
 import org.apache.accumulo.core.data.ColumnUpdate;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.security.AuthenticationTokenIdentifier;
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Text;
@@ -50,6 +54,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -89,6 +94,29 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation>
{
    * @since 1.5.0
    */
   public static void setConnectorInfo(JobConf job, String principal, AuthenticationToken
token) throws AccumuloSecurityException {
+    if (token instanceof KerberosToken) {
+      log.info("Received KerberosToken, attempting to fetch DelegationToken");
+      try {
+        Instance instance = getInstance(job);
+        Connector conn = instance.getConnector(principal, token);
+        token = conn.securityOperations().getDelegationToken(new DelegationTokenConfig());
+      } catch (Exception e) {
+        log.warn("Failed to automatically obtain DelegationToken, Mappers/Reducers will likely
fail to communicate with Accumulo", e);
+      }
+    }
+    // DelegationTokens can be passed securely from user to task without serializing insecurely
in the configuration
+    if (token instanceof DelegationToken) {
+      DelegationToken delegationToken = (DelegationToken) token;
+
+      // Convert it into a Hadoop Token
+      AuthenticationTokenIdentifier identifier = delegationToken.getIdentifier();
+      Token<AuthenticationTokenIdentifier> hadoopToken = new Token<>(identifier.getBytes(),
delegationToken.getPassword(), identifier.getKind(),
+          delegationToken.getServiceName());
+
+      // Add the Hadoop Token to the Job so it gets serialized and passed along.
+      job.getCredentials().addToken(hadoopToken.getService(), hadoopToken);
+    }
+
     OutputConfigurator.setConnectorInfo(CLASS, job, principal, token);
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/fbbe8c32/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
index 3164e4a..c4ab313 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
@@ -34,15 +34,19 @@ import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
 import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase;
 import org.apache.accumulo.core.client.mapreduce.lib.impl.OutputConfigurator;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.SecurityErrorCode;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.DelegationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer;
 import org.apache.accumulo.core.data.ColumnUpdate;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.security.AuthenticationTokenIdentifier;
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
@@ -52,6 +56,7 @@ import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.security.token.Token;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
@@ -90,6 +95,29 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
    * @since 1.5.0
    */
   public static void setConnectorInfo(Job job, String principal, AuthenticationToken token)
throws AccumuloSecurityException {
+    if (token instanceof KerberosToken) {
+      log.info("Received KerberosToken, attempting to fetch DelegationToken");
+      try {
+        Instance instance = getInstance(job);
+        Connector conn = instance.getConnector(principal, token);
+        token = conn.securityOperations().getDelegationToken(new DelegationTokenConfig());
+      } catch (Exception e) {
+        log.warn("Failed to automatically obtain DelegationToken, Mappers/Reducers will likely
fail to communicate with Accumulo", e);
+      }
+    }
+    // DelegationTokens can be passed securely from user to task without serializing insecurely
in the configuration
+    if (token instanceof DelegationToken) {
+      DelegationToken delegationToken = (DelegationToken) token;
+
+      // Convert it into a Hadoop Token
+      AuthenticationTokenIdentifier identifier = delegationToken.getIdentifier();
+      Token<AuthenticationTokenIdentifier> hadoopToken = new Token<>(identifier.getBytes(),
delegationToken.getPassword(), identifier.getKind(),
+          delegationToken.getServiceName());
+
+      // Add the Hadoop Token to the Job so it gets serialized and passed along.
+      job.getCredentials().addToken(hadoopToken.getService(), hadoopToken);
+    }
+
     OutputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, token);
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/fbbe8c32/test/src/main/java/org/apache/accumulo/test/randomwalk/sequential/MapRedVerify.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/sequential/MapRedVerify.java
b/test/src/main/java/org/apache/accumulo/test/randomwalk/sequential/MapRedVerify.java
index 95c1d0b..20cd686 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/sequential/MapRedVerify.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/sequential/MapRedVerify.java
@@ -41,6 +41,9 @@ public class MapRedVerify extends Test {
     args[1] = getMapReduceJars();
     args[2] = env.getUserName();
     args[3] = env.getPassword();
+    if (null == args[3]) {
+      args[3] = env.getKeytab();
+    }
     args[4] = state.getString("seqTableName");
     args[5] = env.getInstance().getInstanceName();
     args[6] = env.getConfigProperty("ZOOKEEPERS");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/fbbe8c32/test/src/main/java/org/apache/accumulo/test/randomwalk/sequential/MapRedVerifyTool.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/sequential/MapRedVerifyTool.java
b/test/src/main/java/org/apache/accumulo/test/randomwalk/sequential/MapRedVerifyTool.java
index 1e384ed..8c907ae 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/sequential/MapRedVerifyTool.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/sequential/MapRedVerifyTool.java
@@ -20,12 +20,19 @@ import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.security.tokens.DelegationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.SystemPermission;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
@@ -33,6 +40,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Tool;
 import org.apache.log4j.Logger;
 
@@ -86,12 +94,51 @@ public class MapRedVerifyTool extends Configured implements Tool {
       return 1;
     }
 
-    ClientConfiguration clientConf = new ClientConfiguration().withInstance(args[3]).withZkHosts(args[4]);
+    ClientConfiguration clientConf = ClientConfiguration.loadDefault().withInstance(args[3]).withZkHosts(args[4]);
 
-    job.setInputFormatClass(AccumuloInputFormat.class);
-    AccumuloInputFormat.setConnectorInfo(job, args[0], new PasswordToken(args[1]));
     AccumuloInputFormat.setInputTableName(job, args[2]);
     AccumuloInputFormat.setZooKeeperInstance(job, clientConf);
+    AccumuloOutputFormat.setDefaultTableName(job, args[5]);
+    AccumuloOutputFormat.setZooKeeperInstance(job, clientConf);
+
+    job.setInputFormatClass(AccumuloInputFormat.class);
+    if (clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false))
{
+      // Better be logged in
+      KerberosToken token = new KerberosToken();
+      try {
+        UserGroupInformation user = UserGroupInformation.getCurrentUser();
+        if (!user.hasKerberosCredentials()) {
+          throw new IllegalStateException("Expected current user to have Kerberos credentials");
+        }
+
+        String newPrincipal = user.getUserName();
+
+        ZooKeeperInstance inst = new ZooKeeperInstance(clientConf);
+        Connector conn = inst.getConnector(newPrincipal, token);
+
+        // Do the explicit check to see if the user has the permission to get a delegation
token
+        if (!conn.securityOperations().hasSystemPermission(conn.whoami(), SystemPermission.OBTAIN_DELEGATION_TOKEN))
{
+          log.error(newPrincipal + " doesn't have the " + SystemPermission.OBTAIN_DELEGATION_TOKEN.name()
+              + " SystemPermission neccesary to obtain a delegation token. MapReduce tasks
cannot automatically use the client's"
+              + " credentials on remote servers. Delegation tokens provide a means to run
MapReduce without distributing the user's credentials.");
+          throw new IllegalStateException(conn.whoami() + " does not have permission to obtain
a delegation token");
+        }
+
+        // Fetch a delegation token from Accumulo
+        DelegationToken dt = conn.securityOperations().getDelegationToken(new DelegationTokenConfig());
+
+        // Set the delegation token instead of the kerberos token
+        AccumuloInputFormat.setConnectorInfo(job, newPrincipal, dt);
+        AccumuloOutputFormat.setConnectorInfo(job, newPrincipal, dt);
+      } catch (Exception e) {
+        final String msg = "Failed to acquire DelegationToken for use with MapReduce";
+        log.error(msg, e);
+        throw new RuntimeException(msg, e);
+      }
+    } else {
+      AccumuloInputFormat.setConnectorInfo(job, args[0], new PasswordToken(args[1]));
+      AccumuloOutputFormat.setConnectorInfo(job, args[0], new PasswordToken(args[1]));
+    }
 
     job.setMapperClass(SeqMapClass.class);
     job.setMapOutputKeyClass(NullWritable.class);
@@ -101,10 +148,7 @@ public class MapRedVerifyTool extends Configured implements Tool {
     job.setNumReduceTasks(1);
 
     job.setOutputFormatClass(AccumuloOutputFormat.class);
-    AccumuloOutputFormat.setConnectorInfo(job, args[0], new PasswordToken(args[1]));
     AccumuloOutputFormat.setCreateTables(job, true);
-    AccumuloOutputFormat.setDefaultTableName(job, args[5]);
-    AccumuloOutputFormat.setZooKeeperInstance(job, clientConf);
 
     job.waitForCompletion(true);
     return job.isSuccessful() ? 0 : 1;


Mime
View raw message