hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077096 - in /hadoop/common/branches/branch-0.20-security-patches/src: contrib/capacity-scheduler/ contrib/streaming/ mapred/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapreduce/ mapred/org/apache/hadoop/mapreduce/security/ test/or...
Date Fri, 04 Mar 2011 03:40:04 GMT
Author: omalley
Date: Fri Mar  4 03:40:03 2011
New Revision: 1077096

URL: http://svn.apache.org/viewvc?rev=1077096&view=rev
Log:
commit 7e75a13e37b76c54c0a68d2927703ab4c878439d
Author: Jitendra Nath Pandey <jitendra@yahoo-inc.com>
Date:   Wed Jan 6 13:53:05 2010 -0800

    MAPREDUCE-1026 from https://issues.apache.org/jira/secure/attachment/12429584/MR-1026-0_20.2.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1026. Shuffle should be secure. (Jitendra Nath Pandey)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/JobTokens.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestShuffleJobToken.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/ivy.xml
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/ivy.xml
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/ivy.xml?rev=1077096&r1=1077095&r2=1077096&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/ivy.xml
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/ivy.xml
Fri Mar  4 03:40:03 2011
@@ -52,5 +52,9 @@
       name="commons-httpclient"
       rev="${commons-httpclient.version}"
       conf="common->master"/> 
+    <dependency org="commons-codec"
+      name="commons-codec"
+      rev="${commons-codec.version}"
+      conf="common->default"/>
   </dependencies>
 </ivy-module>

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/ivy.xml?rev=1077096&r1=1077095&r2=1077096&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/ivy.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/ivy.xml Fri
Mar  4 03:40:03 2011
@@ -76,5 +76,9 @@
       name="commons-net"
       rev="${commons-net.version}"
       conf="common->master"/>  -->
+    <dependency org="commons-codec"
+      name="commons-codec"
+      rev="${commons-codec.version}"
+      conf="common->default"/>
     </dependencies>
 </ivy-module>

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java?rev=1077096&r1=1077095&r2=1077096&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
Fri Mar  4 03:40:03 2011
@@ -26,12 +26,15 @@ import java.net.InetSocketAddress;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapred.JvmTask;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.security.JobTokens;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.jvm.JvmMetrics;
@@ -62,6 +65,13 @@ class Child {
     final int SLEEP_LONGER_COUNT = 5;
     int jvmIdInt = Integer.parseInt(args[3]);
     JVMId jvmId = new JVMId(firstTaskid.getJobID(),firstTaskid.isMap(),jvmIdInt);
+
+    // file name is passed thru env
+    String jobTokenFile = System.getenv().get("JOB_TOKEN_FILE");
+    FileSystem localFs = FileSystem.getLocal(defaultConf);
+    JobTokens jt = loadJobTokens(jobTokenFile, localFs);
+    LOG.debug("Child: got jobTokenfile=" + jobTokenFile);
+
     TaskUmbilicalProtocol umbilical =
       (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
           TaskUmbilicalProtocol.versionID,
@@ -138,6 +148,10 @@ class Child {
         //are viewable immediately
         TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
         JobConf job = new JobConf(task.getJobFile());
+
+        // set the jobTokenFile into task
+        task.setJobTokens(jt);
+
         //setupWorkDir actually sets up the symlinks for the distributed
         //cache. After a task exits we wipe the workdir clean, and hence
         //the symlinks have to be rebuilt.
@@ -203,4 +217,22 @@ class Child {
       LogManager.shutdown();
     }
   }
+  
+  /**
+   * load secret keys from a file
+   * @param jobTokenFile
+   * @param conf
+   * @throws IOException
+   */
+  private static JobTokens loadJobTokens(String jobTokenFile, FileSystem localFS) 
+  throws IOException {
+    Path localJobTokenFile = new Path (jobTokenFile);
+    FSDataInputStream in = localFS.open(localJobTokenFile);
+    JobTokens jt = new JobTokens();
+    jt.readFields(in);
+        
+    LOG.debug("Loaded jobTokenFile from: "+localJobTokenFile.toUri().getPath());
+    in.close();
+    return jt;
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077096&r1=1077095&r2=1077096&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Fri Mar  4 03:40:03 2011
@@ -41,6 +41,8 @@ import org.apache.hadoop.fs.LocalFileSys
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobHistory.Values;
 import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
+import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
@@ -49,6 +51,7 @@ import org.apache.hadoop.net.NetworkTopo
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
 /*************************************************************
@@ -495,13 +498,18 @@ class JobInProgress {
     // log the job priority
     setPriority(this.priority);
     
+    Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
+    FileSystem fs = jobDir.getFileSystem(conf);
+    //
+    // generate security keys needed by Tasks
+    //
+    generateJobTokens(fs, jobtracker.getSystemDirectoryForJob(jobId));
+    
     //
     // read input splits and create a map per a split
     //
     String jobFile = profile.getJobFile();
 
-    Path sysDir = new Path(this.jobtracker.getSystemDir());
-    FileSystem fs = sysDir.getFileSystem(conf);
     DataInputStream splitFile =
       fs.open(new Path(conf.get("mapred.job.split.file")));
     JobClient.RawSplit[] splits;
@@ -3069,4 +3077,30 @@ class JobInProgress {
       );
     }
   }
+  
+  /**
+   * generate keys and save it into the file
+   * @param jobDir
+   * @throws IOException
+   */
+  private void generateJobTokens(FileSystem fs, Path jobDir) throws IOException{
+    Path keysFile = new Path(jobDir, JobTokens.JOB_TOKEN_FILENAME);
+    FSDataOutputStream os = fs.create(keysFile);
+    //create JobTokens file and add key to it
+    JobTokens jt = new JobTokens();
+    byte [] key;
+    try {
+      // new key
+      key = SecureShuffleUtils.getNewEncodedKey();
+    } catch (java.security.GeneralSecurityException e) {
+      throw new IOException(e);
+    }
+    // remember the key 
+    jt.setShuffleJobToken(key);
+    // other keys..
+    jt.write(os);
+    os.close();
+    LOG.debug("jobTokens generated and stored in "+ keysFile.toUri().getPath());
+  }
+
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1077096&r1=1077095&r2=1077096&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Fri Mar  4 03:40:03 2011
@@ -85,6 +85,11 @@ import org.apache.hadoop.util.Progressab
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
+import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.commons.codec.binary.Base64;
+import java.security.GeneralSecurityException;
+
 /** A Reduce task. */
 class ReduceTask extends Task {
 
@@ -1147,11 +1152,15 @@ class ReduceTask extends Task {
       private CompressionCodec codec = null;
       private Decompressor decompressor = null;
       
-      public MapOutputCopier(JobConf job, Reporter reporter) {
+      private final byte[] shuffleJobToken;
+      
+      public MapOutputCopier(JobConf job, Reporter reporter, byte [] shuffleJobToken) {
         setName("MapOutputCopier " + reduceTask.getTaskID() + "." + id);
         LOG.debug(getName() + " created");
         this.reporter = reporter;
-        
+
+        this.shuffleJobToken = shuffleJobToken;       	
+ 
         shuffleConnectionTimeout =
           job.getInt("mapreduce.reduce.shuffle.connect.timeout", STALLED_COPY_TIMEOUT);
         shuffleReadTimeout =
@@ -1376,11 +1385,31 @@ class ReduceTask extends Task {
                                      Path filename, int reduce)
       throws IOException, InterruptedException {
         // Connect
-        URLConnection connection = 
-          mapOutputLoc.getOutputLocation().openConnection();
+        URL url = mapOutputLoc.getOutputLocation();
+        URLConnection connection = url.openConnection();
+
+        // generate hash of the url
+        SecureShuffleUtils ssutil = new SecureShuffleUtils(shuffleJobToken);
+        String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
+        String encHash = ssutil.hashFromString(msgToEncode);
+
+        // put url hash into http header
+        connection.addRequestProperty(
+            SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
+        
         InputStream input = getInputStream(connection, shuffleConnectionTimeout,
                                            shuffleReadTimeout); 
-        
+
+        // get the replyHash which is HMac of the encHash we sent to the server
+        String replyHash = connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
+        if(replyHash==null) {
+          throw new IOException("security validation of TT Map output failed");
+        }       
+        LOG.debug("url="+msgToEncode+";encHash="+encHash+";replyHash="+replyHash);
+        // verify that replyHash is HMac of encHash
+        ssutil.verifyReply(replyHash, encHash);
+        LOG.info("for url="+msgToEncode+" sent hash and receievd reply");
+ 
         // Validate header from map output
         TaskAttemptID mapId = null;
         try {
@@ -1864,7 +1893,8 @@ class ReduceTask extends Task {
       
       // start all the copying threads
       for (int i=0; i < numCopiers; i++) {
-        MapOutputCopier copier = new MapOutputCopier(conf, reporter);
+        MapOutputCopier copier = new MapOutputCopier(conf, reporter,
+            reduceTask.getJobTokens().getShuffleJobToken());
         copiers.add(copier);
         copier.start();
       }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1077096&r1=1077095&r2=1077096&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
Fri Mar  4 03:40:03 2011
@@ -47,11 +47,14 @@ import org.apache.hadoop.io.serializer.D
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
 
 /** 
  * Base class for tasks.
@@ -147,6 +150,7 @@ abstract public class Task implements Wr
   private String pidFile = "";
   protected TaskUmbilicalProtocol umbilical;
   private int numSlotsRequired;
+  protected JobTokens jobTokens=null; // storage of the secret keys
 
   ////////////////////////////////////////////
   // Constructors
@@ -198,6 +202,23 @@ abstract public class Task implements Wr
   public JobID getJobID() {
     return taskId.getJobID();
   }
+
+  /**
+   * set JobToken storage 
+   * @param jt
+   */
+  public void setJobTokens(JobTokens jt) {
+    this.jobTokens = jt;
+  }
+
+  /**
+   * get JobToken storage
+   * @return storage object
+   */
+  public JobTokens getJobTokens() {
+    return this.jobTokens;
+  }
+
   
   /**
    * Get the index of this task within the job.
@@ -1264,7 +1285,6 @@ abstract public class Task implements Wr
                                                 reporter, comparator, keyClass,
                                                 valueClass);
       reducer.run(reducerContext);
-    }
-    
+    } 
   }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1077096&r1=1077095&r2=1077096&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
Fri Mar  4 03:40:03 2011
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Vector;
 import java.net.URI;
+import org.apache.hadoop.mapreduce.JobContext;
 
 /** Base class that runs a task in a separate process.  Tasks are run in a
  * separate process in order to isolate the map/reduce system code from bugs in
@@ -449,6 +450,10 @@ abstract class TaskRunner extends Thread
       }
       env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
 
+      String jobTokenFile = conf.get(JobContext.JOB_TOKEN_FILE);
+      LOG.debug("putting jobToken file name into environment fn=" + jobTokenFile);
+      env.put("JOB_TOKEN_FILE", jobTokenFile);
+
       // for the child of task jvm, set hadoop.root.logger
       env.put("HADOOP_ROOT_LOGGER","INFO,TLA");
       String hadoopClientOpts = System.getenv("HADOOP_CLIENT_OPTS");

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077096&r1=1077095&r2=1077096&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Fri Mar  4 03:40:03 2011
@@ -74,6 +74,8 @@ import org.apache.hadoop.mapred.TaskStat
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
 import org.apache.hadoop.mapred.pipes.Submitter;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsException;
 import org.apache.hadoop.metrics.MetricsRecord;
@@ -186,7 +188,7 @@ public class TaskTracker 
    * Map from taskId -> TaskInProgress.
    */
   Map<TaskAttemptID, TaskInProgress> runningTasks = null;
-  Map<JobID, RunningJob> runningJobs = null;
+  Map<JobID, RunningJob> runningJobs = new TreeMap<JobID, RunningJob>();
   volatile int mapTotal = 0;
   volatile int reduceTotal = 0;
   boolean justStarted = true;
@@ -211,6 +213,7 @@ public class TaskTracker 
   private static final String CACHEDIR = "archive";
   private static final String JOBCACHE = "jobcache";
   private static final String OUTPUT = "output";
+  static final String JOB_TOKEN_FILE="jobToken"; //localized file
   private JobConf originalConf;
   private JobConf fConf;
   private int maxMapSlots;
@@ -420,6 +423,10 @@ public class TaskTracker 
     this.taskLogsMonitor = t;
   }
 
+  public static String getUserDir(String user) {
+    return TaskTracker.SUBDIR + Path.SEPARATOR + user;
+  } 
+
   static String getCacheSubdir() {
     return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR;
   }
@@ -428,6 +435,11 @@ public class TaskTracker 
     return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE;
   }
 
+  public static String getLocalJobDir(String user, String jobid) {
+    return getUserDir(user) + Path.SEPARATOR + getJobCacheSubdir() 
+        + Path.SEPARATOR + jobid;
+  } 
+
   static String getLocalJobDir(String jobid) {
 	return getJobCacheSubdir() + Path.SEPARATOR + jobid; 
   }
@@ -440,6 +452,11 @@ public class TaskTracker 
 	return getLocalTaskDir(jobid, taskid) 
            + Path.SEPARATOR + TaskTracker.OUTPUT ; 
   }
+  
+  static String getLocalJobTokenFile(String user, String jobid) {
+    return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOB_TOKEN_FILE;
+  }
+
 
   static String getLocalTaskDir(String jobid, 
                                 String taskid, 
@@ -890,8 +907,16 @@ public class TaskTracker 
         }
         rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
                              localJobConf.getKeepFailedTaskFiles());
-        rjob.localized = true;
         rjob.jobConf = localJobConf;
+        // save local copy of JobToken file
+        localizeJobTokenFile(t.getUser(), jobId, localJobConf);       
+        FSDataInputStream in = localFs.open(new Path(
+            rjob.jobConf.get(JobContext.JOB_TOKEN_FILE)));
+        JobTokens jt = new JobTokens();
+        jt.readFields(in); 
+        rjob.jobTokens = jt; // store JobToken object per job
+ 
+        rjob.localized = true;
         taskController.initializeJob(jobId);
       }
     }
@@ -2886,6 +2911,7 @@ public class TaskTracker 
     boolean localized;
     boolean keepJobFiles;
     FetchStatus f;
+    JobTokens jobTokens;
     RunningJob(JobID jobid) {
       this.jobid = jobid;
       localized = false;
@@ -3073,6 +3099,8 @@ public class TaskTracker 
       TaskTracker tracker = 
         (TaskTracker) context.getAttribute("task.tracker");
 
+      verifyRequest(request, response, tracker, jobId);
+
       long startTime = 0;
       try {
         shuffleMetrics.serverHandlerBusy();
@@ -3181,7 +3209,58 @@ public class TaskTracker 
       outStream.close();
       shuffleMetrics.successOutput();
     }
+    
+    /**
+     * verify that request has correct HASH for the url
+     * and also add a field to reply header with hash of the HASH
+     * @param request
+     * @param response
+     * @param jt the job token
+     * @throws IOException
+     */
+    private void verifyRequest(HttpServletRequest request, 
+        HttpServletResponse response, TaskTracker tracker, String jobId) 
+    throws IOException {
+      JobTokens jt = null;
+      synchronized (tracker.runningJobs) {
+        RunningJob rjob = tracker.runningJobs.get(JobID.forName(jobId));
+        if (rjob == null) {
+          throw new IOException("Unknown job " + jobId + "!!");
+        }
+        jt = rjob.jobTokens;
+      }
+      // string to encrypt
+      String enc_str = SecureShuffleUtils.buildMsgFrom(request);
+      
+      // hash from the fetcher
+      String urlHashStr = request.getHeader(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
+      if(urlHashStr == null) {
+        response.sendError(HttpServletResponse.SC_UNAUTHORIZED);
+        throw new IOException("fetcher cannot be authenticated");
+      }
+      int len = urlHashStr.length();
+      LOG.debug("verifying request. enc_str="+enc_str+"; hash=..."+
+          urlHashStr.substring(len-len/2, len-1)); // half of the hash for debug
+
+      SecureShuffleUtils ssutil = new SecureShuffleUtils(jt.getShuffleJobToken());
+      // verify - throws exception
+      try {
+        ssutil.verifyReply(urlHashStr, enc_str);
+      } catch (IOException ioe) {
+        response.sendError(HttpServletResponse.SC_UNAUTHORIZED);
+        throw ioe;
+      }
+      
+      // verification passed - encode the reply
+      String reply = ssutil.generateHash(urlHashStr.getBytes());
+      response.addHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
+      
+      len = reply.length();
+      LOG.debug("Fetcher request verfied. enc_str="+enc_str+";reply="
+          +reply.substring(len-len/2, len-1));
+    }
   }
+  
 
   // get the full paths of the directory in all the local disks.
   Path[] getLocalFiles(JobConf conf, String subdir) throws IOException{
@@ -3407,4 +3486,37 @@ public class TaskTracker 
     healthChecker = new NodeHealthCheckerService(conf);
     healthChecker.start();
   }
+  
+    /**
+     * Download the job-token file from the FS and save on local fs.
+     * @param user
+     * @param jobId
+     * @param jobConf
+     * @return the local file system path of the downloaded file.
+     * @throws IOException
+     */
+    private void localizeJobTokenFile(String user, JobID jobId, JobConf jobConf)
+        throws IOException {
+      // check if the tokenJob file is there..
+      Path skPath = new Path(systemDirectory, 
+          jobId.toString()+"/"+JobTokens.JOB_TOKEN_FILENAME);
+      
+      FileStatus status = null;
+      long jobTokenSize = -1;
+      status = systemFS.getFileStatus(skPath); //throws FileNotFoundException
+      jobTokenSize = status.getLen();
+      
+      Path localJobTokenFile =
+          lDirAlloc.getLocalPathForWrite(getLocalJobTokenFile(user, 
+              jobId.toString()), jobTokenSize, fConf);
+    
+      LOG.debug("localizingJobTokenFile from sd="+skPath.toUri().getPath() + 
+          " to " + localJobTokenFile.toUri().getPath());
+      
+      // Download job_token
+      systemFS.copyToLocalFile(skPath, localJobTokenFile);      
+      // set it into jobConf to transfer the name to TaskRunner
+      jobConf.set(JobContext.JOB_TOKEN_FILE,localJobTokenFile.toString());
+    }
+
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java?rev=1077096&r1=1077095&r2=1077096&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
Fri Mar  4 03:40:03 2011
@@ -47,6 +47,8 @@ public class JobContext {
 
   protected final org.apache.hadoop.mapred.JobConf conf;
   private final JobID jobId;
+
+  public static final String JOB_TOKEN_FILE = "mapreduce.job.jobTokenFile";
   
   public JobContext(Configuration conf, JobID jobId) {
     this.conf = new org.apache.hadoop.mapred.JobConf(conf);

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/JobTokens.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/JobTokens.java?rev=1077096&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/JobTokens.java
(added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/JobTokens.java
Fri Mar  4 03:40:03 2011
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * get/set, store/load security keys
+ * key's value - byte[]
+ * store/load from DataInput/DataOuptut
+ * List of currently store keys:
+ *  jobToken for secure shuffle HTTP Get
+ *
+ */
+public class JobTokens implements Writable {
+  /**
+   * file name used on HDFS for generated keys
+   */
+  public static final String JOB_TOKEN_FILENAME = "jobTokens";
+
+  private byte [] shuffleJobToken = null; // jobtoken for shuffle (map output)
+
+  
+  /**
+   * returns the key value for the alias
+   * @return key for this alias
+   */
+  public byte[] getShuffleJobToken() {
+    return shuffleJobToken;
+  }
+  
+  /**
+   * sets the jobToken
+   * @param key
+   */
+  public void setShuffleJobToken(byte[] key) {
+    shuffleJobToken = key;
+  }
+  
+  /**
+   * stores all the keys to DataOutput
+   * @param out
+   * @throws IOException
+   */
+  @Override
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeCompressedByteArray(out, shuffleJobToken);
+  }
+  
+  /**
+   * loads all the keys
+   * @param in
+   * @throws IOException
+   */
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    shuffleJobToken = WritableUtils.readCompressedByteArray(in);
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java?rev=1077096&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java
(added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java
Fri Mar  4 03:40:03 2011
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.security;
+
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.URL;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+
+import javax.crypto.KeyGenerator;
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+import javax.servlet.http.HttpServletRequest;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.record.Utils;
+
+/**
+ * 
+ * utilities for generating kyes, hashes and verifying them for shuffle
+ *
+ */
+public class SecureShuffleUtils {
+  public static final String HTTP_HEADER_URL_HASH = "UrlHash";
+  public static final String HTTP_HEADER_REPLY_URL_HASH = "ReplyHash";
+  public static KeyGenerator kg = null;
+  public static String DEFAULT_ALG="HmacSHA1";
+  
+  private SecretKeySpec secretKey;
+  private Mac mac;
+  
+  /**
+   * static generate keys
+   * @return new encoded key
+   * @throws NoSuchAlgorithmException
+   */
+  public static byte[] getNewEncodedKey() throws NoSuchAlgorithmException{
+    SecretKeySpec key = generateKey(DEFAULT_ALG);
+    return key.getEncoded();
+  }
+  
+  private static SecretKeySpec generateKey(String alg) throws NoSuchAlgorithmException {
+    if(kg==null) {
+      kg = KeyGenerator.getInstance(alg);
+    }
+    return (SecretKeySpec) kg.generateKey();
+  }
+
+  /**
+   * Create a util object with alg and key
+   * @param sKeyEncoded
+   * @throws NoSuchAlgorithmException
+   * @throws InvalidKeyException
+   */
+  public SecureShuffleUtils(byte [] sKeyEncoded) 
+  throws  IOException{
+    secretKey = new SecretKeySpec(sKeyEncoded, SecureShuffleUtils.DEFAULT_ALG);
+    try {
+      mac = Mac.getInstance(DEFAULT_ALG);
+      mac.init(secretKey);
+    } catch (NoSuchAlgorithmException nae) {
+      throw new IOException(nae);
+    } catch( InvalidKeyException ie) {
+      throw new IOException(ie);
+    }
+  }
+  
+  /** 
+   * get key as byte[]
+   * @return encoded key
+   */
+  public byte [] getEncodedKey() {
+    return secretKey.getEncoded();
+  }
+  
+  /**
+   * Base64 encoded hash of msg
+   * @param msg
+   */
+  public String generateHash(byte[] msg) {
+    return new String(Base64.encodeBase64(generateByteHash(msg)));
+  }
+  
+  /**
+   * calculate hash of msg
+   * @param msg
+   * @return
+   */
+  private byte[] generateByteHash(byte[] msg) {
+    return mac.doFinal(msg);
+  }
+  
+  /**
+   * verify that hash equals to HMacHash(msg)
+   * @param newHash
+   * @return true if is the same
+   */
+  private boolean verifyHash(byte[] hash, byte[] msg) {
+    byte[] msg_hash = generateByteHash(msg);
+    return Utils.compareBytes(msg_hash, 0, msg_hash.length, hash, 0, hash.length) == 0;
+  }
+  
+  /**
+   * Aux util to calculate hash of a String
+   * @param enc_str
+   * @return Base64 encodedHash
+   * @throws IOException
+   */
+  public String hashFromString(String enc_str) 
+  throws IOException {
+    return generateHash(enc_str.getBytes()); 
+  }
+  
+  /**
+   * verify that base64Hash is same as HMacHash(msg)  
+   * @param base64Hash (Base64 encoded hash)
+   * @param msg
+   * @throws IOException if not the same
+   */
+  public void verifyReply(String base64Hash, String msg)
+  throws IOException {
+    byte[] hash = Base64.decodeBase64(base64Hash.getBytes());
+    
+    boolean res = verifyHash(hash, msg.getBytes());
+    
+    if(res != true) {
+      throw new IOException("Verification of the hashReply failed");
+    }
+  }
+  
+  /**
+   * Shuffle specific utils - build string for encoding from URL
+   * @param url
+   * @return string for encoding
+   */
+  public static String buildMsgFrom(URL url) {
+    return buildMsgFrom(url.getPath(), url.getQuery(), url.getPort());
+  }
+  /**
+   * Shuffle specific utils - build string for encoding from URL
+   * @param request
+   * @return string for encoding
+   */
+  public static String buildMsgFrom(HttpServletRequest request ) {
+    return buildMsgFrom(request.getRequestURI(), request.getQueryString(),
+        request.getLocalPort());
+  }
+  /**
+   * Shuffle specific utils - build string for encoding from URL
+   * @param uri_path
+   * @param uri_query
+   * @return string for encoding
+   */
+  private static String buildMsgFrom(String uri_path, String uri_query, int port) {
+    return String.valueOf(port) + uri_path + "?" + uri_query;
+  }
+  
+  
+  /**
+   * byte array to Hex String
+   * @param ba
+   * @return string with HEX value of the key
+   */
+  public static String toHex(byte[] ba) {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PrintStream ps = new PrintStream(baos);
+    for(byte b: ba) {
+      ps.printf("%x", b);
+    }
+    return baos.toString();
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestShuffleJobToken.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestShuffleJobToken.java?rev=1077096&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestShuffleJobToken.java
(added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestShuffleJobToken.java
Fri Mar  4 03:40:03 2011
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.security.GeneralSecurityException;
+
+import org.apache.hadoop.http.HttpServer;
+import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+
+public class TestShuffleJobToken {
+  private static HttpServer server;
+  private static URL baseUrl;
+  private static File dir;
+  private static final String JOB_ID = "job_20091117075357176_0001";
+  
+  // create fake url
+  private URL getMapOutputURL(String host)  throws MalformedURLException {
+    // Get the base url
+    StringBuffer url = new StringBuffer(host);
+    url.append("mapOutput?");
+    url.append("job=" + JOB_ID + "&");
+    url.append("reduce=0&");
+    url.append("map=attempt");
+
+    return new URL(url.toString());
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    dir = new File(System.getProperty("build.webapps", "build/webapps") + "/test");
+    System.out.println("dir="+dir.getAbsolutePath());
+    if(!dir.exists()) {
+      assertTrue(dir.mkdirs());
+    }
+    server = new HttpServer("test", "0.0.0.0", 0, true);
+    server.addServlet("shuffle", "/mapOutput", TaskTracker.MapOutputServlet.class);
+    server.start();
+    int port = server.getPort();
+    baseUrl = new URL("http://localhost:" + port + "/");
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if(dir.exists())
+      dir.delete();
+    if(server!=null)
+      server.stop();
+  }
+
+  
+  /**
+   * try positive and negative case with invalid urlHash
+   */
+  @Test
+  public void testInvalidJobToken()
+  throws IOException, GeneralSecurityException {
+    
+    URL url = getMapOutputURL(baseUrl.toString());
+    String enc_str = SecureShuffleUtils.buildMsgFrom(url);
+    URLConnection connectionGood = url.openConnection();
+
+    // create key 
+    byte [] key= SecureShuffleUtils.getNewEncodedKey();
+    
+    // create fake TaskTracker - needed for keys storage
+    JobTokens jt = new JobTokens();
+    jt.setShuffleJobToken(key);
+    TaskTracker tt  = new TaskTracker();
+    addJobToken(tt, JOB_ID, jt); // fake id
+    server.setAttribute("task.tracker", tt);
+
+    // encode the url
+    SecureShuffleUtils mac = new SecureShuffleUtils(key);
+    String urlHashGood = mac.generateHash(enc_str.getBytes()); // valid hash
+    
+    // another the key
+    byte [] badKey= SecureShuffleUtils.getNewEncodedKey();
+    mac = new SecureShuffleUtils(badKey);
+    String urlHashBad = mac.generateHash(enc_str.getBytes()); // invalid hash 
+    
+    // put url hash into http header
+    connectionGood.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, urlHashGood);
+    
+    // valid url hash should not fail with security error
+    try {
+      connectionGood.getInputStream();
+    } catch (IOException ie) {
+      String msg = ie.getLocalizedMessage();
+      if(msg.contains("Server returned HTTP response code: 401 for URL:")) {
+        fail("securtity failure with valid urlHash:"+ie);
+      }
+      System.out.println("valid urlhash passed validation");
+    } 
+    // invalid url hash
+    URLConnection connectionBad = url.openConnection();
+    connectionBad.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, urlHashBad);
+    
+    try {
+      connectionBad.getInputStream();
+      fail("Connection should've failed because of invalid urlHash");
+    } catch (IOException ie) {
+      String msg = ie.getLocalizedMessage();
+      if(!msg.contains("Server returned HTTP response code: 401 for URL:")) {
+        fail("connection failed with other then validation error:"+ie);
+      }
+      System.out.println("validation worked, failed with:"+ie);
+    } 
+  }
+  /*Note that this method is there for a unit testcase (TestShuffleJobToken)*/
+  void addJobToken(TaskTracker tt, String jobIdStr, JobTokens jt) {
+    JobID jobId = JobID.forName(jobIdStr);
+    TaskTracker.RunningJob rJob = new TaskTracker.RunningJob(jobId);
+    rJob.jobTokens = jt;
+    synchronized (tt.runningJobs) {
+      tt.runningJobs.put(jobId, rJob);
+    }
+  }
+
+}



Mime
View raw message