hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r892893 [3/3] - in /hadoop/mapreduce/trunk: ./ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/ src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/ src...
Date Mon, 21 Dec 2009 17:36:48 GMT
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java Mon
Dec 21 17:36:44 2009
@@ -17,25 +17,81 @@
  */
 package org.apache.hadoop.mapred;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
-
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
 import org.apache.hadoop.util.ToolRunner;
 
 import junit.framework.TestCase;
 
+/**
+ * Test job submission. This test checks if 
+ *   - basic   : job submission via jobclient
+ *   - cleanup : job client crashes while submitting
+ *   - invalid job config
+ *     - invalid memory config
+ *   
+ */
 public class TestSubmitJob extends TestCase {
-  private MiniMRCluster miniMRCluster;
-
-  @Override
-  protected void tearDown()
-      throws Exception {
-    if (miniMRCluster != null) {
-      miniMRCluster.shutdown();
-    }
+  static final Log LOG = LogFactory.getLog(TestSubmitJob.class);
+  
+  private MiniMRCluster mrCluster;
+
+  private MiniDFSCluster dfsCluster;
+  private JobTracker jt;
+  private FileSystem fs;
+  private static Path TEST_DIR = 
+    new Path(System.getProperty("test.build.data","/tmp"), 
+             "job-submission-testing");
+  private static int numSlaves = 1;
+
+  private void startCluster() throws Exception {
+    super.setUp();
+    Configuration conf = new Configuration();
+    dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null);
+    JobConf jConf = new JobConf(conf);
+    jConf.setLong("mapred.job.submission.expiry.interval", 6 * 1000);
+    mrCluster = new MiniMRCluster(0, 0, numSlaves, 
+        dfsCluster.getFileSystem().getUri().toString(), 1, null, null, null, 
+        jConf);
+    jt = mrCluster.getJobTrackerRunner().getJobTracker();
+    fs = FileSystem.get(mrCluster.createJobConf());
+  }
+  
+  private void stopCluster() throws Exception {
+    mrCluster.shutdown();
+    mrCluster = null;
+    dfsCluster.shutdown();
+    dfsCluster = null;
+    jt = null;
+    fs = null;
   }
 
   /**
@@ -56,9 +112,9 @@
     jtConf.setLong(JTConfig.JT_MAX_REDUCEMEMORY_MB,
         4 * 1024L);
 
-    miniMRCluster = new MiniMRCluster(0, "file:///", 0, null, null, jtConf);
+    mrCluster = new MiniMRCluster(0, "file:///", 0, null, null, jtConf);
 
-    JobConf clusterConf = miniMRCluster.createJobConf();
+    JobConf clusterConf = mrCluster.createJobConf();
 
     // No map-memory configuration
     JobConf jobConf = new JobConf(clusterConf);
@@ -85,6 +141,9 @@
     jobConf.setMemoryForReduceTask(5 * 1024L);
     runJobAndVerifyFailure(jobConf, 1 * 1024L, 5 * 1024L,
         "Exceeds the cluster's max-memory-limit.");
+    
+    mrCluster.shutdown();
+    mrCluster = null;
   }
 
   private void runJobAndVerifyFailure(JobConf jobConf, long memForMapTasks,
@@ -110,4 +169,127 @@
         + " - doesn't contain expected message - " + overallExpectedMsg, msg
         .contains(overallExpectedMsg));
   }
-}
+  
+  static ClientProtocol getJobSubmitClient(JobConf conf, 
+                                           UserGroupInformation ugi) 
+  throws IOException {
+    return (ClientProtocol) RPC.getProxy(ClientProtocol.class, 
+        ClientProtocol.versionID, JobTracker.getAddress(conf), ugi, 
+        conf, NetUtils.getSocketFactory(conf, ClientProtocol.class));
+  }
+
+  static org.apache.hadoop.hdfs.protocol.ClientProtocol getDFSClient(
+      Configuration conf, UserGroupInformation ugi) 
+  throws IOException {
+    return (org.apache.hadoop.hdfs.protocol.ClientProtocol) 
+      RPC.getProxy(org.apache.hadoop.hdfs.protocol.ClientProtocol.class, 
+        org.apache.hadoop.hdfs.protocol.ClientProtocol.versionID, 
+        NameNode.getAddress(conf), ugi, 
+        conf, 
+        NetUtils.getSocketFactory(conf, 
+            org.apache.hadoop.hdfs.protocol.ClientProtocol.class));
+  }
+  
+  /**
+   * Submit a job and check if the files are accessible to other users.
+   */
+  public void testSecureJobExecution() throws Exception {
+    LOG.info("Testing secure job submission/execution");
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    try {
+      Configuration conf = new Configuration();
+      UnixUserGroupInformation.saveToConf(conf,
+      UnixUserGroupInformation.UGI_PROPERTY_NAME, 
+      TestMiniMRWithDFSWithDistinctUsers.DFS_UGI);
+      dfs = new MiniDFSCluster(conf, 1, true, null);
+      FileSystem fs = dfs.getFileSystem();
+      TestMiniMRWithDFSWithDistinctUsers.mkdir(fs, "/user");
+      TestMiniMRWithDFSWithDistinctUsers.mkdir(fs, "/mapred");
+      TestMiniMRWithDFSWithDistinctUsers.mkdir(fs, 
+          conf.get(JTConfig.JT_STAGING_AREA_ROOT));
+      UnixUserGroupInformation MR_UGI = 
+        TestMiniMRWithDFSWithDistinctUsers.createUGI(
+           UnixUserGroupInformation.login().getUserName(), false); 
+      mr = new MiniMRCluster(0, 0, 1, dfs.getFileSystem().getUri().toString(),
+                             1, null, null, MR_UGI);
+      JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+      String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
+
+      // cleanup
+      dfs.getFileSystem().delete(TEST_DIR, true);
+
+      final Path mapSignalFile = new Path(TEST_DIR, "map-signal");
+      final Path reduceSignalFile = new Path(TEST_DIR, "reduce-signal");
+      
+      // create a ugi for user 1
+      UnixUserGroupInformation user1 = 
+        TestMiniMRWithDFSWithDistinctUsers.createUGI("user1", false);
+      Path inDir = new Path("/user/input");
+      Path outDir = new Path("/user/output");
+      JobConf job = 
+      TestMiniMRWithDFSWithDistinctUsers.createJobConf(mr, user1);
+
+      UtilsForTests.configureWaitingJobConf(job, inDir, outDir, 2, 0, 
+        "test-submit-job", mapSignalFile.toString(), 
+        reduceSignalFile.toString());
+      job.set(UtilsForTests.getTaskSignalParameter(true), 
+      mapSignalFile.toString());
+      job.set(UtilsForTests.getTaskSignalParameter(false), 
+      reduceSignalFile.toString());
+      LOG.info("Submit job as the actual user (" + user1.getUserName() + ")");
+      JobClient jClient = new JobClient(job);
+      RunningJob rJob = jClient.submitJob(job);
+      JobID id = rJob.getID();
+      LOG.info("Running job " + id);
+
+      // create user2
+      UnixUserGroupInformation user2 = 
+        TestMiniMRWithDFSWithDistinctUsers.createUGI("user2", false);
+      JobConf conf_other = 
+      TestMiniMRWithDFSWithDistinctUsers.createJobConf(mr, user2);
+      org.apache.hadoop.hdfs.protocol.ClientProtocol client = 
+        getDFSClient(conf_other, user2);
+
+      // try accessing mapred.system.dir/jobid/*
+      boolean failed = false;
+      try {
+        Path path = new Path(new URI(jt.getSystemDir()).getPath());
+        LOG.info("Try listing the mapred-system-dir as the user (" 
+                 + user2.getUserName() + ")");
+        client.getListing(path.toString());
+      } catch (IOException ioe) {
+        failed = true;
+      }
+      assertTrue("JobTracker system dir is accessible to others", failed);
+      // try accessing ~/.staging/jobid/*
+      failed = false;
+      JobInProgress jip = jt.getJob(id);
+      Path jobSubmitDirpath = 
+        new Path(jip.getJobConf().get("mapreduce.job.dir"));
+      try {
+        LOG.info("Try accessing the job folder for job " + id + " as the user (" 
+                 + user2.getUserName() + ")");
+        client.getListing(jobSubmitDirpath.toString());
+      } catch (IOException ioe) {
+        failed = true;
+      }
+      assertTrue("User's staging folder is accessible to others", failed);
+      UtilsForTests.signalTasks(dfs, fs, true, mapSignalFile.toString(), 
+      reduceSignalFile.toString());
+      // wait for job to be done
+      UtilsForTests.waitTillDone(jClient);
+
+      // check if the staging area is cleaned up
+      LOG.info("Check if job submit dir is cleanup or not");
+      assertFalse(fs.exists(jobSubmitDirpath));
+    } finally {
+      if (mr != null) {
+        mr.shutdown();
+      }
+      if (dfs != null) {
+        dfs.shutdown();
+      }
+    }
+  }
+}
\ No newline at end of file

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
Mon Dec 21 17:36:44 2009
@@ -163,7 +163,7 @@
     taskId =
         new TaskAttemptID(jtIdentifier, jobId.getId(), TaskType.MAP, 1, 0);
     task =
-        new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, null, 1);
+        new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, 1);
     task.setConf(job.getConfiguration()); // Set conf. Set user name in particular.
 
     // create jobTokens file

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java Mon
Dec 21 17:36:44 2009
@@ -259,7 +259,9 @@
     while (true) {
       boolean shouldWait = false;
       for (JobStatus jobStatuses : jobClient.getAllJobs()) {
-        if (jobStatuses.getRunState() == JobStatus.RUNNING) {
+        if (jobStatuses.getRunState() != JobStatus.SUCCEEDED
+            && jobStatuses.getRunState() != JobStatus.FAILED
+            && jobStatuses.getRunState() != JobStatus.KILLED) {
           shouldWait = true;
           break;
         }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java
Mon Dec 21 17:36:44 2009
@@ -150,13 +150,13 @@
     } else if (null != conf.getClass(INDIRECT_INPUT_FORMAT, null)) {
       // specified IndirectInputFormat? Build src list
       JobClient jClient = new JobClient(conf);  
-      Path sysdir = jClient.getSystemDir();
+      Path tmpDir = new Path("/tmp");
       Random r = new Random();
-      Path indirInputFile = new Path(sysdir,
+      Path indirInputFile = new Path(tmpDir,
           Integer.toString(r.nextInt(Integer.MAX_VALUE), 36) + "_files");
       conf.set(INDIRECT_INPUT_FILE, indirInputFile.toString());
       SequenceFile.Writer writer = SequenceFile.createWriter(
-          sysdir.getFileSystem(conf), conf, indirInputFile,
+          tmpDir.getFileSystem(conf), conf, indirInputFile,
           LongWritable.class, Text.class,
           SequenceFile.CompressionType.NONE);
       try {

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCh.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCh.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCh.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCh.java Mon Dec 21 17:36:44
2009
@@ -24,6 +24,8 @@
 import java.util.List;
 import java.util.Stack;
 
+import javax.security.auth.login.LoginException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -45,6 +47,10 @@
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -420,10 +426,21 @@
     return Math.max(numMaps, 1);
   }
 
-  private boolean setup(List<FileOperation> ops, Path log) throws IOException {
+  private boolean setup(List<FileOperation> ops, Path log) 
+  throws IOException {
     final String randomId = getRandomId();
     JobClient jClient = new JobClient(jobconf);
-    Path jobdir = new Path(jClient.getSystemDir(), NAME + "_" + randomId);
+    Path stagingArea;
+    try {
+      stagingArea = JobSubmissionFiles.getStagingDir(
+                       jClient.getClusterHandle(), jobconf);
+    } catch (InterruptedException ie){
+      throw new IOException(ie);
+    }
+    Path jobdir = new Path(stagingArea + NAME + "_" + randomId);
+    FsPermission mapredSysPerms = 
+      new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
+    FileSystem.mkdirs(jClient.getFs(), jobdir, mapredSysPerms);
     LOG.info(JOB_DIR_LABEL + "=" + jobdir);
 
     if (log == null) {

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java Mon Dec 21 17:36:44
2009
@@ -34,6 +34,8 @@
 import java.util.Stack;
 import java.util.StringTokenizer;
 
+import javax.security.auth.login.LoginException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -66,7 +68,10 @@
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -1196,7 +1201,18 @@
 
     final String randomId = getRandomId();
     JobClient jClient = new JobClient(jobConf);
-    Path jobDirectory = new Path(jClient.getSystemDir(), NAME + "_" + randomId);
+    Path stagingArea;
+    try {
+      stagingArea = 
+        JobSubmissionFiles.getStagingDir(jClient.getClusterHandle(), conf);
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
+    
+    Path jobDirectory = new Path(stagingArea + NAME + "_" + randomId);
+    FsPermission mapredSysPerms = 
+      new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
+    FileSystem.mkdirs(jClient.getFs(), jobDirectory, mapredSysPerms);
     jobConf.set(JOB_DIR_LABEL, jobDirectory.toString());
 
     FileSystem dstfs = args.dst.getFileSystem(conf);

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java Mon Dec 21
17:36:44 2009
@@ -29,6 +29,8 @@
 import java.util.Set;
 import java.util.TreeMap;
 
+import javax.security.auth.login.LoginException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -38,6 +40,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.HarFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
@@ -56,7 +59,12 @@
 import org.apache.hadoop.mapred.SequenceFileRecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -359,12 +367,22 @@
     }
     conf.set(DST_DIR_LABEL, outputPath.toString());
     final String randomId = DistCp.getRandomId();
-    Path jobDirectory = new Path(new JobClient(conf).getSystemDir(),
+    Path stagingArea;
+    try {
+      stagingArea = JobSubmissionFiles.getStagingDir(new Cluster(conf), 
+          conf);
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
+    Path jobDirectory = new Path(stagingArea,
                           NAME + "_" + randomId);
+    FsPermission mapredSysPerms = 
+      new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
+    FileSystem.mkdirs(jobDirectory.getFileSystem(conf), jobDirectory,
+                      mapredSysPerms);
     conf.set(JOB_DIR_LABEL, jobDirectory.toString());
     //get a tmp directory for input splits
     FileSystem jobfs = jobDirectory.getFileSystem(conf);
-    jobfs.mkdirs(jobDirectory);
     Path srcFiles = new Path(jobDirectory, "_har_src_files");
     conf.set(SRC_LIST_LABEL, srcFiles.toString());
     SequenceFile.Writer srcWriter = SequenceFile.createWriter(jobfs, conf,



Mime
View raw message