hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r903227 [12/16] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ conf/ ivy/ src/benchmarks/gridmix/ src/benchmarks/gridmix/javasort/ src/benchmarks/gridmix/maxent/ src/benchmarks/gridmix/monsterQuery/ src/benchmarks/gri...
Date Tue, 26 Jan 2010 14:03:09 GMT
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/tools/MRAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/tools/MRAdmin.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/tools/MRAdmin.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/tools/MRAdmin.java Tue Jan 26 14:02:53 2010
@@ -28,6 +28,7 @@
 import org.apache.hadoop.mapred.JobTracker;
 import org.apache.hadoop.mapred.AdminOperationsProtocol;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.RefreshUserToGroupMappingsProtocol;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
 import org.apache.hadoop.util.Tool;
@@ -53,16 +54,20 @@
   private static void printHelp(String cmd) {
     String summary = "hadoop mradmin is the command to execute Map-Reduce administrative commands.\n" +
     "The full syntax is: \n\n" +
-    "hadoop mradmin [-refreshServiceAcl] [-refreshQueues] [-help [cmd]] "
-    + "[-refreshNodes]\n"; 
+    "hadoop mradmin [-refreshServiceAcl] [-refreshQueues] " +
+    "[-refreshNodes] [-refreshUserToGroupsMappings] [-help [cmd]]\n"; 
 
   String refreshServiceAcl = "-refreshServiceAcl: Reload the service-level authorization policy file\n" +
     "\t\tJobtracker will reload the authorization policy file.\n";
 
   String refreshQueues =
-        "-refreshQueues: Reload the queue acls and state.\n"
-            + "\t\tJobTracker will reload the mapred-queues.xml file.\n";
+        "-refreshQueues: Reload the queues' acls, states and "
+            + "scheduler specific properties.\n"
+            + "\t\tJobTracker will reload the mapred-queues configuration file.\n";
 
+  String refreshUserToGroupsMappings = 
+    "-refreshUserToGroupsMappings: Refresh user-to-groups mappings\n";
+  
   String refreshNodes =
     "-refreshNodes: Refresh the hosts information at the jobtracker.\n";
   
@@ -73,6 +78,8 @@
     System.out.println(refreshServiceAcl);
   } else if ("refreshQueues".equals(cmd)) {
     System.out.println(refreshQueues);
+  } else if ("refreshUserToGroupsMappings".equals(cmd)) {
+    System.out.println(refreshUserToGroupsMappings);
   }  else if ("refreshNodes".equals(cmd)) {
     System.out.println(refreshNodes);
   } else if ("help".equals(cmd)) {
@@ -98,12 +105,15 @@
       System.err.println("Usage: java MRAdmin" + " [-refreshServiceAcl]");
     } else if ("-refreshQueues".equals(cmd)) {
       System.err.println("Usage: java MRAdmin" + " [-refreshQueues]");
+    } else if ("-refreshUserToGroupsMappings".equals(cmd)) {
+      System.err.println("Usage: java MRAdmin" + " [-refreshUserToGroupsMappings]");
     } else if ("-refreshNodes".equals(cmd)) {
       System.err.println("Usage: java MRAdmin" + " [-refreshNodes]");
     } else {
       System.err.println("Usage: java MRAdmin");
       System.err.println("           [-refreshServiceAcl]");
       System.err.println("           [-refreshQueues]");
+      System.err.println("           [-refreshUserToGroupsMappings]");
       System.err.println("           [-refreshNodes]");
       System.err.println("           [-help [cmd]]");
       System.err.println();
@@ -142,6 +152,29 @@
     return 0;
   }
 
+  /**
+   * Refresh the user-to-groups mappings on the {@link JobTracker}.
+   * @return exitcode 0 on success, non-zero on failure
+   * @throws IOException
+   */
+  private int refreshUserToGroupsMappings() throws IOException {
+    // Get the current configuration
+    Configuration conf = getConf();
+    // Create the client
+    RefreshUserToGroupMappingsProtocol refreshProtocol = 
+      (RefreshUserToGroupMappingsProtocol) 
+      RPC.getProxy(RefreshUserToGroupMappingsProtocol.class, 
+                   RefreshUserToGroupMappingsProtocol.versionID, 
+                   JobTracker.getAddress(conf), getUGI(conf), conf,
+                   NetUtils.getSocketFactory(conf, 
+                                             RefreshUserToGroupMappingsProtocol.class));
+    
+    // Refresh the user-to-groups mappings
+    refreshProtocol.refreshUserToGroupsMappings(conf);
+    
+    return 0;
+  }
+  
   private int refreshQueues() throws IOException {
     // Get the current configuration
     Configuration conf = getConf();
@@ -196,12 +229,11 @@
     int exitCode = -1;
     int i = 0;
     String cmd = args[i++];
-
     //
     // verify that we have enough command line parameters
     //
-    if ("-refreshServiceAcl".equals(cmd) || "-refreshQueues".equals(cmd)
-        || "-refreshNodes".equals(cmd)) {
+    if ("-refreshServiceAcl".equals(cmd) || "-refreshQueues".equals(cmd) ||
+        "-refreshNodes".equals(cmd) || "-refreshUserToGroupsMappings".equals(cmd)) {
       if (args.length != 1) {
         printUsage(cmd);
         return exitCode;
@@ -214,6 +246,8 @@
         exitCode = refreshAuthorizationPolicy();
       } else if ("-refreshQueues".equals(cmd)) {
         exitCode = refreshQueues();
+      } else if ("-refreshUserToGroupsMappings".equals(cmd)) {
+        exitCode = refreshUserToGroupsMappings();
       } else if ("-refreshNodes".equals(cmd)) {
         exitCode = refreshNodes();
       } else if ("-help".equals(cmd)) {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Cluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Cluster.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Cluster.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Cluster.java Tue Jan 26 14:02:53 2010
@@ -46,6 +46,7 @@
   private Configuration conf;
   private FileSystem fs = null;
   private Path sysDir = null;
+  private Path stagingAreaDir = null;
   private Path jobHistoryDir = null;
 
   static {
@@ -76,6 +77,7 @@
     ClientProtocol client;
     String tracker = conf.get("mapred.job.tracker", "local");
     if ("local".equals(tracker)) {
+      conf.setInt("mapreduce.job.maps", 1);
       client = new LocalJobRunner(conf);
     } else {
       client = createRPCProxy(JobTracker.getAddress(conf), conf);
@@ -222,6 +224,19 @@
     }
     return sysDir;
   }
+  
+  /**
+   * Grab the jobtracker's view of the staging directory path where 
+   * job-specific files will  be placed.
+   * 
+   * @return the staging directory where job-specific files are to be placed.
+   */
+  public Path getStagingAreaDir() throws IOException, InterruptedException {
+    if (stagingAreaDir == null) {
+      stagingAreaDir = new Path(client.getStagingAreaDir());
+    }
+    return stagingAreaDir;
+  }
 
   /**
    * Get the job history file path for a given job id. The job history file at 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Job.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Job.java Tue Jan 26 14:02:53 2010
@@ -20,8 +20,6 @@
 
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -30,7 +28,6 @@
 import java.io.OutputStreamWriter;
 import java.net.URL;
 import java.net.URLConnection;
-import java.util.Arrays;
 import java.net.URI;
 
 import javax.security.auth.login.LoginException;
@@ -40,12 +37,9 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration.IntegerRanges;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.mapreduce.util.ConfigUtil;
@@ -956,7 +950,7 @@
     ensureState(JobState.DEFINE);
     setUseNewAPI();
     status = new JobSubmitter(cluster.getFileSystem(),
-      cluster.getClient()).submitJobInternal(this);
+      cluster.getClient()).submitJobInternal(this, cluster);
     state = JobState.RUNNING;
    }
   
@@ -1033,12 +1027,52 @@
     return isSuccessful();
   }
 
+  /**
+   * @return true if the profile parameters indicate that this is using
+   * hprof, which generates profile files in a particular location
+   * that we can retrieve to the client.
+   */
+  private boolean shouldDownloadProfile() {
+    // Check the argument string that was used to initialize profiling.
+    // If this indicates hprof and file-based output, then we're ok to
+    // download.
+    String profileParams = getProfileParams();
+
+    if (null == profileParams) {
+      return false;
+    }
+
+    // Split this on whitespace.
+    String [] parts = profileParams.split("[ \\t]+");
+
+    // If any of these indicate hprof, and the use of output files, return true.
+    boolean hprofFound = false;
+    boolean fileFound = false;
+    for (String p : parts) {
+      if (p.startsWith("-agentlib:hprof") || p.startsWith("-Xrunhprof")) {
+        hprofFound = true;
+
+        // This contains a number of comma-delimited components, one of which
+        // may specify the file to write to. Make sure this is present and
+        // not empty.
+        String [] subparts = p.split(",");
+        for (String sub : subparts) {
+          if (sub.startsWith("file=") && sub.length() != "file=".length()) {
+            fileFound = true;
+          }
+        }
+      }
+    }
+
+    return hprofFound && fileFound;
+  }
+
   private void printTaskEvents(TaskCompletionEvent[] events,
       Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges,
       IntegerRanges reduceRanges) throws IOException, InterruptedException {
     for (TaskCompletionEvent event : events) {
       TaskCompletionEvent.Status status = event.getStatus();
-      if (profiling && 
+      if (profiling && shouldDownloadProfile() &&
          (status == TaskCompletionEvent.Status.SUCCEEDED ||
             status == TaskCompletionEvent.Status.FAILED) &&
             (event.isMapTask() ? mapRanges : reduceRanges).
@@ -1211,106 +1245,4 @@
     }
     return ugi;
   }
-
-  /**
-   * Read a splits file into a list of raw splits.
-   * 
-   * @param in the stream to read from
-   * @return the complete list of splits
-   * @throws IOException
-   */
-  public static RawSplit[] readSplitFile(DataInput in) throws IOException {
-    byte[] header = new byte[JobSubmitter.SPLIT_FILE_HEADER.length];
-    in.readFully(header);
-    if (!Arrays.equals(JobSubmitter.SPLIT_FILE_HEADER, header)) {
-      throw new IOException("Invalid header on split file");
-    }
-    int vers = WritableUtils.readVInt(in);
-    if (vers != JobSubmitter.CURRENT_SPLIT_FILE_VERSION) {
-      throw new IOException("Unsupported split version " + vers);
-    }
-    int len = WritableUtils.readVInt(in);
-    RawSplit[] result = new RawSplit[len];
-    for (int i=0; i < len; ++i) {
-      result[i] = new RawSplit();
-      result[i].readFields(in);
-    }
-    return result;
-  }
-
-  public static class RawSplit implements Writable {
-    private String splitClass;
-    private BytesWritable bytes = new BytesWritable();
-    private String[] locations;
-    long dataLength;
-
-    public RawSplit() {
-    }
-    
-    protected RawSplit(String splitClass, BytesWritable bytes,
-        String[] locations, long dataLength) {
-      this.splitClass = splitClass;
-      this.bytes = bytes;
-      this.locations = locations;
-      this.dataLength = dataLength;
-    }
-
-    public void setBytes(byte[] data, int offset, int length) {
-      bytes.set(data, offset, length);
-    }
-
-    public void setClassName(String className) {
-      splitClass = className;
-    }
-      
-    public String getClassName() {
-      return splitClass;
-    }
-      
-    public BytesWritable getBytes() {
-      return bytes;
-    }
-
-    public void clearBytes() {
-      bytes = null;
-    }
-      
-    public void setLocations(String[] locations) {
-      this.locations = locations;
-    }
-      
-    public String[] getLocations() {
-      return locations;
-    }
-
-    public long getDataLength() {
-      return dataLength;
-    }
-
-    public void setDataLength(long l) {
-      dataLength = l;
-    }
-      
-    public void readFields(DataInput in) throws IOException {
-      splitClass = Text.readString(in);
-      dataLength = in.readLong();
-      bytes.readFields(in);
-      int len = WritableUtils.readVInt(in);
-      locations = new String[len];
-      for (int i=0; i < len; ++i) {
-        locations[i] = Text.readString(in);
-      }
-    }
-      
-    public void write(DataOutput out) throws IOException {
-      Text.writeString(out, splitClass);
-      out.writeLong(dataLength);
-      bytes.write(out);
-      WritableUtils.writeVInt(out, locations.length);
-      for (int i = 0; i < locations.length; i++) {
-        Text.writeString(out, locations[i]);
-      }        
-    }
-  }
-
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobContext.java Tue Jan 26 14:02:53 2010
@@ -55,6 +55,7 @@
   public static final String JAR = "mapreduce.job.jar";
   public static final String ID = "mapreduce.job.id";
   public static final String JOB_NAME = "mapreduce.job.name";
+  public static final String JAR_UNPACK_PATTERN = "mapreduce.job.jar.unpack.pattern";
   public static final String USER_NAME = "mapreduce.job.user.name";
   public static final String PRIORITY = "mapreduce.job.priority";
   public static final String QUEUE_NAME = "mapreduce.job.queuename";
@@ -106,6 +107,10 @@
     "mapreduce.job.cache.files.timestamps";
   public static final String CACHE_ARCHIVES_TIMESTAMPS = 
     "mapreduce.job.cache.archives.timestamps";
+  public static final String CACHE_FILE_VISIBILITIES = 
+    "mapreduce.job.cache.files.visibilities";
+  public static final String CACHE_ARCHIVES_VISIBILITIES = 
+    "mapreduce.job.cache.archives.visibilities";
   public static final String CACHE_SYMLINK = 
     "mapreduce.job.cache.symlink.create";
   
@@ -162,7 +167,7 @@
     "mapreduce.map.skip.proc-count.auto-incr";
   public static final String MAP_SKIP_MAX_RECORDS = 
     "mapreduce.map.skip.maxrecords";
-  public static final String MAP_COMBINE_MIN_SPISS = 
+  public static final String MAP_COMBINE_MIN_SPILLS = 
     "mapreduce.map.combine.minspills";
   public static final String MAP_OUTPUT_COMPRESS = 
     "mapreduce.map.output.compress";
@@ -212,6 +217,10 @@
     "mapreduce.reduce.shuffle.connect.timeout";
   public static final String SHUFFLE_READ_TIMEOUT = 
     "mapreduce.reduce.shuffle.read.timeout";
+  public static final String SHUFFLE_FETCH_FAILURES = 
+    "mapreduce.reduce.shuffle.maxfetchfailures";
+  public static final String SHUFFLE_NOTIFY_READERROR = 
+    "mapreduce.reduce.shuffle.notify.readerror";
   public static final String REDUCE_SKIP_INCR_PROC_COUNT = 
     "mapreduce.reduce.skip.proc-count.auto-incr";
   public static final String REDUCE_SKIP_MAXGROUPS = 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobCounter.properties
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobCounter.properties?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobCounter.properties (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobCounter.properties Tue Jan 26 14:02:53 2010
@@ -1,3 +1,15 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
 # ResourceBundle properties file for job-level counters
 
 CounterGroupName=                  Job Counters 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java Tue Jan 26 14:02:53 2010
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.mapreduce;
 
-import java.io.DataOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.URI;
@@ -26,6 +26,7 @@
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,15 +36,15 @@
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.mapreduce.split.JobSplitWriter;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.codehaus.jackson.map.ObjectMapper;
 
 class JobSubmitter {
   protected static final Log LOG = LogFactory.getLog(JobSubmitter.class);
@@ -128,12 +129,7 @@
     String files = conf.get("tmpfiles");
     String libjars = conf.get("tmpjars");
     String archives = conf.get("tmparchives");
-      
-    /*
-     * set this user's id in job configuration, so later job files can be
-     * accessed using this user's id
-     */
-    job.setUGIAndUserGroupNames();
+    String jobJar = job.getJar();
 
     //
     // Figure out what fs the JobTracker is using.  Copy the
@@ -145,14 +141,18 @@
 
     // Create a number of filenames in the JobTracker's fs namespace
     LOG.debug("default FileSystem: " + jtFs.getUri());
-    jtFs.delete(submitJobDir, true);
+    if (jtFs.exists(submitJobDir)) {
+      throw new IOException("Not submitting job. Job directory " + submitJobDir
+          +" already exists!! This is unexpected.Please check what's there in" +
+          " that directory");
+    }
     submitJobDir = jtFs.makeQualified(submitJobDir);
     submitJobDir = new Path(submitJobDir.toUri().getPath());
-    FsPermission mapredSysPerms = new FsPermission(JOB_DIR_PERMISSION);
+    FsPermission mapredSysPerms = new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
     FileSystem.mkdirs(jtFs, submitJobDir, mapredSysPerms);
-    Path filesDir = new Path(submitJobDir, "files");
-    Path archivesDir = new Path(submitJobDir, "archives");
-    Path libjarsDir = new Path(submitJobDir, "libjars");
+    Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
+    Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
+    Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
     // add all the command line files/ jars and archive
     // first copy them to jobtrackers filesystem 
       
@@ -185,7 +185,8 @@
       for (String tmpjars: libjarsArr) {
         Path tmp = new Path(tmpjars);
         Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication);
-        DistributedCache.addFileToClassPath(newPath, conf);
+        DistributedCache.addFileToClassPath(
+            new Path(newPath.toUri().getPath()), conf);
       }
     }
       
@@ -212,11 +213,26 @@
         DistributedCache.createSymlink(conf);
       }
     }
-      
+
+    if (jobJar != null) {   // copy jar to JobTracker's fs
+      // use jar name if job is not named. 
+      if ("".equals(job.getJobName())){
+        job.setJobName(new Path(jobJar).getName());
+      }
+      copyJar(new Path(jobJar), JobSubmissionFiles.getJobJar(submitJobDir), 
+          replication);
+      job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString());
+    } else {
+      LOG.warn("No job jar file set.  User classes may not be found. "+
+      "See Job or Job#setJar(String).");
+    }
+
     //  set the timestamps of the archives and files
     TrackerDistributedCacheManager.determineTimestamps(conf);
+    //  set the public/private visibility of the archives and files
+    TrackerDistributedCacheManager.determineCacheVisibilities(conf);
   }
-
+  
   private URI getPathURI(Path destPath, String fragment) 
       throws URISyntaxException {
     URI pathURI = destPath.toUri();
@@ -234,36 +250,20 @@
       short replication) throws IOException {
     jtFs.copyFromLocalFile(originalJarPath, submitJarFile);
     jtFs.setReplication(submitJarFile, replication);
-    jtFs.setPermission(submitJarFile, new FsPermission(JOB_FILE_PERMISSION));
+    jtFs.setPermission(submitJarFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
   }
+  
   /**
    * configure the jobconf of the user with the command line options of 
    * -libjars, -files, -archives.
    * @param conf
    * @throws IOException
    */
-  private void configureCommandLineOptions(Job job, Path submitJobDir,
-      Path submitJarFile) throws IOException {
+  private void copyAndConfigureFiles(Job job, Path jobSubmitDir) 
+  throws IOException {
     Configuration conf = job.getConfiguration();
     short replication = (short)conf.getInt(Job.SUBMIT_REPLICATION, 10);
-    copyAndConfigureFiles(job, submitJobDir, replication);
-    
-    /* set this user's id in job configuration, so later job files can be
-     * accessed using this user's id
-     */
-    String originalJarPath = job.getJar();
-
-    if (originalJarPath != null) {           // copy jar to JobTracker's fs
-      // use jar name if job is not named. 
-      if ("".equals(job.getJobName())){
-        job.setJobName(new Path(originalJarPath).getName());
-      }
-      job.setJar(submitJarFile.toString());
-      copyJar(new Path(originalJarPath), submitJarFile, replication);
-    } else {
-      LOG.warn("No job jar file set.  User classes may not be found. "+
-               "See Job or Job#setJar(String).");
-    }
+    copyAndConfigureFiles(job, jobSubmitDir, replication);
 
     // Set the working directory
     if (job.getWorkingDirectory() == null) {
@@ -271,15 +271,6 @@
     }
 
   }
-
-  // job files are world-wide readable and owner writable
-  final private static FsPermission JOB_FILE_PERMISSION = 
-    FsPermission.createImmutable((short) 0644); // rw-r--r--
-
-  // job submission directory is world readable/writable/executable
-  final static FsPermission JOB_DIR_PERMISSION =
-    FsPermission.createImmutable((short) 0777); // rwx-rwx-rwx
-   
   /**
    * Internal method for submitting jobs to the system.
    * 
@@ -305,45 +296,79 @@
    *   </li>
    * </ol></p>
    * @param job the configuration to submit
+   * @param cluster the handle to the Cluster
    * @throws ClassNotFoundException
    * @throws InterruptedException
    * @throws IOException
    */
-  JobStatus submitJobInternal(Job job) throws ClassNotFoundException,
-      InterruptedException, IOException {
-    
+  @SuppressWarnings("unchecked")
+  JobStatus submitJobInternal(Job job, Cluster cluster) 
+  throws ClassNotFoundException, InterruptedException, IOException {
+    /*
+     * set this user's id in job configuration, so later job files can be
+     * accessed using this user's id
+     */
+    job.setUGIAndUserGroupNames();
+
+    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, 
+                                                     job.getConfiguration());
     //configure the command line options correctly on the submitting dfs
     Configuration conf = job.getConfiguration();
     JobID jobId = submitClient.getNewJobID();
-    Path submitJobDir = new Path(submitClient.getSystemDir(), jobId.toString());
-    Path submitJarFile = new Path(submitJobDir, "job.jar");
-    Path submitSplitFile = new Path(submitJobDir, "job.split");
-    configureCommandLineOptions(job, submitJobDir, submitJarFile);
-    Path submitJobFile = new Path(submitJobDir, "job.xml");
-    
-    checkSpecs(job);
+    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
+    JobStatus status = null;
+    try {
+      conf.set("mapreduce.job.dir", submitJobDir.toString());
+      LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
+          + " as the submit dir");
+      copyAndConfigureFiles(job, submitJobDir);
+      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
 
-    // Create the splits for the job
-    LOG.info("Creating splits at " + jtFs.makeQualified(submitSplitFile));
-    int maps = writeSplits(job, submitSplitFile);
-    conf.set("mapred.job.split.file", submitSplitFile.toString());
-    conf.setInt("mapred.map.tasks", maps);
-    LOG.info("number of splits:" + maps);
-    
-    // Write job file to JobTracker's fs
-    writeConf(conf, submitJobFile);
-    
-    //
-    // Now, actually submit the job (using the submit name)
-    //
-    JobStatus status = submitClient.submitJob(jobId);
-    if (status != null) {
-      return status;
-    } else {
-      throw new IOException("Could not launch job");
+      checkSpecs(job);
+      
+      // create TokenStorage object with user secretKeys
+      String tokensFileName = conf.get("tokenCacheFile");
+      TokenStorage tokenStorage = null;
+      if(tokensFileName != null) {
+        LOG.info("loading secret keys from " + tokensFileName);
+        String localFileName = new Path(tokensFileName).toUri().getPath();
+        tokenStorage = new TokenStorage();
+        // read JSON
+        ObjectMapper mapper = new ObjectMapper();
+        Map<String, String> nm = 
+          mapper.readValue(new File(localFileName), Map.class);
+        
+        for(Map.Entry<String, String> ent: nm.entrySet()) {
+          LOG.debug("adding secret key alias="+ent.getKey());
+          tokenStorage.addSecretKey(new Text(ent.getKey()), ent.getValue().getBytes());
+        }
+      }
+
+      // Create the splits for the job
+      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
+      int maps = writeSplits(job, submitJobDir);
+      conf.setInt("mapred.map.tasks", maps);
+      LOG.info("number of splits:" + maps);
+
+      // Write job file to submit dir
+      writeConf(conf, submitJobFile);
+      //
+      // Now, actually submit the job (using the submit name)
+      //
+      status = submitClient.submitJob(jobId, submitJobDir.toString(), tokenStorage);
+      if (status != null) {
+        return status;
+      } else {
+        throw new IOException("Could not launch job");
+      }
+    } finally {
+      if (status == null) {
+        LOG.info("Cleaning up the staging area " + submitJobDir);
+        jtFs.delete(submitJobDir, true);
+      }
     }
   }
-
+  
   private void checkSpecs(Job job) throws ClassNotFoundException, 
       InterruptedException, IOException {
     JobConf jConf = (JobConf)job.getConfiguration();
@@ -364,7 +389,7 @@
     // Write job file to JobTracker's fs        
     FSDataOutputStream out = 
       FileSystem.create(jtFs, jobFile, 
-                        new FsPermission(JOB_FILE_PERMISSION));
+                        new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
     try {
       conf.writeXml(out);
     } finally {
@@ -372,81 +397,42 @@
     }
   }
   
+
   @SuppressWarnings("unchecked")
-  private <T extends InputSplit> 
-  int writeNewSplits(JobContext job, Path submitSplitFile) throws IOException,
+  private <T extends InputSplit>
+  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
       InterruptedException, ClassNotFoundException {
     Configuration conf = job.getConfiguration();
     InputFormat<?, ?> input =
       ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
-    
+
     List<InputSplit> splits = input.getSplits(job);
     T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
 
     // sort the splits into order based on size, so that the biggest
     // go first
     Arrays.sort(array, new SplitComparator());
-    DataOutputStream out = writeSplitsFileHeader(conf, submitSplitFile, 
-                                                 array.length);
-    try {
-      if (array.length != 0) {
-        DataOutputBuffer buffer = new DataOutputBuffer();
-        Job.RawSplit rawSplit = new Job.RawSplit();
-        SerializationFactory factory = new SerializationFactory(conf);
-        Serializer<T> serializer = 
-          factory.getSerializer((Class<T>) array[0].getClass());
-        serializer.open(buffer);
-        for (T split: array) {
-          rawSplit.setClassName(split.getClass().getName());
-          buffer.reset();
-          serializer.serialize(split);
-          rawSplit.setDataLength(split.getLength());
-          rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
-          rawSplit.setLocations(split.getLocations());
-          rawSplit.write(out);
-        }
-        serializer.close();
-      }
-    } finally {
-      out.close();
-    }
+    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, array);
     return array.length;
   }
-
-  static final int CURRENT_SPLIT_FILE_VERSION = 0;
-  static final byte[] SPLIT_FILE_HEADER = "SPL".getBytes();
-
-  private DataOutputStream writeSplitsFileHeader(Configuration conf,
-      Path filename, int length) throws IOException {
-    // write the splits to a file for the job tracker
-    FileSystem fs = filename.getFileSystem(conf);
-    FSDataOutputStream out = 
-      FileSystem.create(fs, filename, new FsPermission(JOB_FILE_PERMISSION));
-    out.write(SPLIT_FILE_HEADER);
-    WritableUtils.writeVInt(out, CURRENT_SPLIT_FILE_VERSION);
-    WritableUtils.writeVInt(out, length);
-    return out;
-  }
-
+  
   private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
-                  Path submitSplitFile) throws IOException,
+      Path jobSubmitDir) throws IOException,
       InterruptedException, ClassNotFoundException {
     JobConf jConf = (JobConf)job.getConfiguration();
-    // Create the splits for the job
-    LOG.debug("Creating splits at " + jtFs.makeQualified(submitSplitFile));
     int maps;
     if (jConf.getUseNewMapper()) {
-      maps = writeNewSplits(job, submitSplitFile);
+      maps = writeNewSplits(job, jobSubmitDir);
     } else {
-      maps = writeOldSplits(jConf, submitSplitFile);
+      maps = writeOldSplits(jConf, jobSubmitDir);
     }
     return maps;
   }
-
-  // method to write splits for old api mapper.
-  private int writeOldSplits(JobConf job, 
-      Path submitSplitFile) throws IOException {
-    org.apache.hadoop.mapred.InputSplit[] splits = 
+  
+  //method to write splits for old api mapper.
+  private int writeOldSplits(JobConf job, Path jobSubmitDir) 
+  throws IOException {
+    org.apache.hadoop.mapred.InputSplit[] splits =
     job.getInputFormat().getSplits(job, job.getNumMapTasks());
     // sort the splits into order based on size, so that the biggest
     // go first
@@ -468,24 +454,7 @@
         }
       }
     });
-    DataOutputStream out = writeSplitsFileHeader(job, submitSplitFile,
-      splits.length);
-
-    try {
-      DataOutputBuffer buffer = new DataOutputBuffer();
-      Job.RawSplit rawSplit = new Job.RawSplit();
-      for (org.apache.hadoop.mapred.InputSplit split: splits) {
-        rawSplit.setClassName(split.getClass().getName());
-        buffer.reset();
-        split.write(buffer);
-        rawSplit.setDataLength(split.getLength());
-        rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
-        rawSplit.setLocations(split.getLocations());
-        rawSplit.write(out);
-      }
-    } finally {
-      out.close();
-    }
+    JobSplitWriter.createSplitFiles(jobSubmitDir, job, splits);
     return splits.length;
   }
   
@@ -505,7 +474,7 @@
       } catch (IOException ie) {
         throw new RuntimeException("exception in compare", ie);
       } catch (InterruptedException ie) {
-        throw new RuntimeException("exception in compare", ie);        
+        throw new RuntimeException("exception in compare", ie);
       }
     }
   }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskCounter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskCounter.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskCounter.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskCounter.java Tue Jan 26 14:02:53 2010
@@ -24,6 +24,7 @@
   MAP_OUTPUT_RECORDS,
   MAP_SKIPPED_RECORDS,
   MAP_OUTPUT_BYTES,
+  SPLIT_RAW_BYTES,
   COMBINE_INPUT_RECORDS,
   COMBINE_OUTPUT_RECORDS,
   REDUCE_INPUT_GROUPS,
@@ -32,5 +33,8 @@
   REDUCE_OUTPUT_RECORDS,
   REDUCE_SKIPPED_GROUPS,
   REDUCE_SKIPPED_RECORDS,
-  SPILLED_RECORDS
+  SPILLED_RECORDS,
+  SHUFFLED_MAPS, 
+  FAILED_SHUFFLE,
+  MERGED_MAP_OUTPUTS
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties Tue Jan 26 14:02:53 2010
@@ -1,3 +1,15 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
 # ResourceBundle properties file for Map-Reduce counters
 
 CounterGroupName=              Map-Reduce Framework
@@ -15,4 +27,6 @@
 REDUCE_SKIPPED_RECORDS.name=   Reduce skipped records
 REDUCE_SKIPPED_GROUPS.name=    Reduce skipped groups
 SPILLED_RECORDS.name=          Spilled Records
-
+SHUFFLED_MAPS.name=            Shuffled Maps 
+FAILED_SHUFFLE.name=           Failed Shuffles
+MERGED_MAP_OUTPUTS.name=       Merged Map outputs

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java Tue Jan 26 14:02:53 2010
@@ -24,6 +24,7 @@
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.DefaultTaskController;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 
@@ -198,9 +199,9 @@
       boolean isArchive, long confFileStamp,
       Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
 
-    return new TrackerDistributedCacheManager(conf).getLocalCache(cache, conf,
-        baseDir.toString(), fileStatus, isArchive, confFileStamp, currentWorkDir,
-        honorSymLinkConf);
+    return new TrackerDistributedCacheManager(conf, new DefaultTaskController())
+        .getLocalCache(cache, conf, baseDir.toString(), fileStatus, isArchive,
+            confFileStamp, currentWorkDir, honorSymLinkConf, false);
   }
 
   /**
@@ -277,8 +278,8 @@
     if (timestamp == null) {
       throw new IOException("TimeStamp of the uri couldnot be found");
     }
-    new TrackerDistributedCacheManager(conf).releaseCache(cache, conf,
-          Long.parseLong(timestamp));
+    new TrackerDistributedCacheManager(conf, new DefaultTaskController())
+        .releaseCache(cache, conf, Long.parseLong(timestamp));
   }
   
   /**
@@ -294,7 +295,8 @@
   @Deprecated
   public static String makeRelative(URI cache, Configuration conf)
       throws IOException {
-    return new TrackerDistributedCacheManager(conf).makeRelative(cache, conf);
+    return new TrackerDistributedCacheManager(conf, new DefaultTaskController())
+        .makeRelative(cache, conf);
   }
 
   /**
@@ -657,6 +659,7 @@
    */
   @Deprecated
   public static void purgeCache(Configuration conf) throws IOException {
-    new TrackerDistributedCacheManager(conf).purgeCache();
+    new TrackerDistributedCacheManager(conf, new DefaultTaskController())
+        .purgeCache();
   }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java Tue Jan 26 14:02:53 2010
@@ -36,6 +36,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.classification.InterfaceAudience;
 
 /**
  * Helper class of {@link TrackerDistributedCacheManager} that represents
@@ -43,9 +44,8 @@
  * by TaskRunner/LocalJobRunner to parse out the job configuration
  * and setup the local caches.
  * 
- * <b>This class is internal to Hadoop, and should not be treated as a public
- * interface.</b>
  */
+@InterfaceAudience.Private
 public class TaskDistributedCacheManager {
   private final TrackerDistributedCacheManager distributedCacheManager;
   private final Configuration taskConf;
@@ -66,6 +66,7 @@
       REGULAR,
       ARCHIVE
     }
+    boolean isPublic = true;
     /** Whether to decompress */
     final FileType type;
     final long timestamp;
@@ -73,10 +74,11 @@
     final boolean shouldBeAddedToClassPath;
     boolean localized = false;
 
-    private CacheFile(URI uri, FileType type, long timestamp, 
+    private CacheFile(URI uri, FileType type, boolean isPublic, long timestamp, 
         boolean classPath) {
       this.uri = uri;
       this.type = type;
+      this.isPublic = isPublic;
       this.timestamp = timestamp;
       this.shouldBeAddedToClassPath = classPath;
     }
@@ -87,7 +89,7 @@
      * files.
      */
     private static List<CacheFile> makeCacheFiles(URI[] uris, 
-        String[] timestamps, Path[] paths, FileType type) {
+        String[] timestamps, String cacheVisibilities[], Path[] paths, FileType type) {
       List<CacheFile> ret = new ArrayList<CacheFile>();
       if (uris != null) {
         if (uris.length != timestamps.length) {
@@ -103,7 +105,8 @@
           URI u = uris[i];
           boolean isClassPath = (null != classPaths.get(u.getPath()));
           long t = Long.parseLong(timestamps[i]);
-          ret.add(new CacheFile(u, type, t, isClassPath));
+          ret.add(new CacheFile(u, type, Boolean.valueOf(cacheVisibilities[i]),
+              t, isClassPath));
         }
       }
       return ret;
@@ -127,11 +130,13 @@
     this.cacheFiles.addAll(
         CacheFile.makeCacheFiles(DistributedCache.getCacheFiles(taskConf),
             DistributedCache.getFileTimestamps(taskConf),
+            TrackerDistributedCacheManager.getFileVisibilities(taskConf),
             DistributedCache.getFileClassPaths(taskConf),
             CacheFile.FileType.REGULAR));
     this.cacheFiles.addAll(
         CacheFile.makeCacheFiles(DistributedCache.getCacheArchives(taskConf),
           DistributedCache.getArchiveTimestamps(taskConf),
+          TrackerDistributedCacheManager.getArchiveVisibilities(taskConf),
           DistributedCache.getArchiveClassPaths(taskConf), 
           CacheFile.FileType.ARCHIVE));
   }
@@ -144,9 +149,9 @@
    * file, if necessary.
    */
   public void setup(LocalDirAllocator lDirAlloc, File workDir, 
-      String cacheSubdir) throws IOException {
+      String privateCacheSubdir, String publicCacheSubDir) throws IOException {
     setupCalled = true;
-    
+      
     if (cacheFiles.isEmpty()) {
       return;
     }
@@ -159,11 +164,14 @@
       URI uri = cacheFile.uri;
       FileSystem fileSystem = FileSystem.get(uri, taskConf);
       FileStatus fileStatus = fileSystem.getFileStatus(new Path(uri.getPath()));
-
+      String cacheSubdir = publicCacheSubDir;
+      if (!cacheFile.isPublic) {
+        cacheSubdir = privateCacheSubdir;
+      }
       Path p = distributedCacheManager.getLocalCache(uri, taskConf,
           cacheSubdir, fileStatus, 
           cacheFile.type == CacheFile.FileType.ARCHIVE,
-          cacheFile.timestamp, workdirPath, false);
+          cacheFile.timestamp, workdirPath, false, cacheFile.isPublic);
       cacheFile.setLocalized(true);
 
       if (cacheFile.type == CacheFile.FileType.ARCHIVE) {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Tue Jan 26 14:02:53 2010
@@ -30,15 +30,22 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskController;
+import org.apache.hadoop.mapred.TaskController.DistributedCacheFileContext;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.RunJar;
+import org.apache.hadoop.classification.InterfaceAudience;
 
 /**
  * Manages a single machine's instance of a cross-job
@@ -46,9 +53,8 @@
  * by a TaskTracker (or something that emulates it,
  * like LocalJobRunner).
  * 
- * <b>This class is internal to Hadoop, and should not be treated as a public
- * interface.</b>
  */
+@InterfaceAudience.Private
 public class TrackerDistributedCacheManager {
   // cacheID to cacheStatus mapping
   private TreeMap<String, CacheStatus> cachedArchives = 
@@ -66,14 +72,32 @@
   
   private LocalDirAllocator lDirAllocator;
   
+  private TaskController taskController;
+  
   private Configuration trackerConf;
   
   private Random random = new Random();
 
-  public TrackerDistributedCacheManager(Configuration conf) throws IOException {
+  private MRAsyncDiskService asyncDiskService;
+  
+  public TrackerDistributedCacheManager(Configuration conf,
+      TaskController taskController) throws IOException {
     this.localFs = FileSystem.getLocal(conf);
     this.trackerConf = conf;
     this.lDirAllocator = new LocalDirAllocator(TTConfig.LOCAL_DIR);
+    this.taskController = taskController;
+  }
+
+  /**
+   * Creates a TrackerDistributedCacheManager with a MRAsyncDiskService.
+   * @param asyncDiskService Provides a set of ThreadPools for async disk 
+   *                         operations.  
+   */
+  public TrackerDistributedCacheManager(Configuration conf,
+      TaskController taskController, MRAsyncDiskService asyncDiskService)
+      throws IOException {
+    this(conf, taskController);
+    this.asyncDiskService = asyncDiskService;
   }
 
   /**
@@ -101,6 +125,7 @@
    * launches
    * NOTE: This is effectively always on since r696957, since there is no code
    * path that does not use this.
+   * @param isPublic to know the cache file is accessible to public or private
    * @return the path to directory where the archives are unjarred in case of
    * archives, the path to the file where the file is copied locally
    * @throws IOException
@@ -108,7 +133,7 @@
   Path getLocalCache(URI cache, Configuration conf,
       String subDir, FileStatus fileStatus,
       boolean isArchive, long confFileStamp,
-      Path currentWorkDir, boolean honorSymLinkConf)
+      Path currentWorkDir, boolean honorSymLinkConf, boolean isPublic)
       throws IOException {
     String key = getKey(cache, conf, confFileStamp);
     CacheStatus lcacheStatus;
@@ -117,13 +142,13 @@
       lcacheStatus = cachedArchives.get(key);
       if (lcacheStatus == null) {
         // was never localized
+        String uniqueString = String.valueOf(random.nextLong());
         String cachePath = new Path (subDir, 
-          new Path(String.valueOf(random.nextLong()),
-            makeRelative(cache, conf))).toString();
+          new Path(uniqueString, makeRelative(cache, conf))).toString();
         Path localPath = lDirAllocator.getLocalPathForWrite(cachePath,
           fileStatus.getLen(), trackerConf);
-        lcacheStatus = new CacheStatus(
-          new Path(localPath.toString().replace(cachePath, "")), localPath); 
+        lcacheStatus = new CacheStatus(new Path(localPath.toString().replace(
+          cachePath, "")), localPath, new Path(subDir), uniqueString);
         cachedArchives.put(key, lcacheStatus);
       }
 
@@ -137,7 +162,7 @@
       synchronized (lcacheStatus) {
         if (!lcacheStatus.isInited()) {
           localizedPath = localizeCache(conf, cache, confFileStamp,
-              lcacheStatus, fileStatus, isArchive);
+              lcacheStatus, fileStatus, isArchive, isPublic);
           lcacheStatus.initComplete();
         } else {
           localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
@@ -242,23 +267,47 @@
     // do the deletion, after releasing the global lock
     for (CacheStatus lcacheStatus : deleteSet) {
       synchronized (lcacheStatus) {
-        FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
-        LOG.info("Deleted path " + lcacheStatus.localLoadPath);
+        deleteLocalPath(asyncDiskService,
+            FileSystem.getLocal(conf), lcacheStatus.localizedLoadPath);
         // decrement the size of the cache from baseDirSize
         synchronized (baseDirSize) {
-          Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
+          Long dirSize = baseDirSize.get(lcacheStatus.localizedBaseDir);
           if ( dirSize != null ) {
             dirSize -= lcacheStatus.size;
-            baseDirSize.put(lcacheStatus.baseDir, dirSize);
+            baseDirSize.put(lcacheStatus.localizedBaseDir, dirSize);
           } else {
             LOG.warn("Cannot find record of the baseDir: " + 
-                     lcacheStatus.baseDir + " during delete!");
+                     lcacheStatus.localizedBaseDir + " during delete!");
           }
         }
       }
     }
   }
 
+  /**
+   * Delete a local path with asyncDiskService if available,
+   * or otherwise synchronously with local file system.
+   */
+  private static void deleteLocalPath(MRAsyncDiskService asyncDiskService,
+      LocalFileSystem fs, Path path) throws IOException {
+    boolean deleted = false;
+    if (asyncDiskService != null) {
+      // Try to delete using asyncDiskService
+      String localPathToDelete = 
+        path.toUri().getPath();
+      deleted = asyncDiskService.moveAndDeleteAbsolutePath(localPathToDelete);
+      if (!deleted) {
+        LOG.warn("Cannot find DistributedCache path " + localPathToDelete
+            + " on any of the asyncDiskService volumes!");
+      }
+    }
+    if (!deleted) {
+      // If no asyncDiskService, we will delete the files synchronously
+      fs.delete(path, true);
+    }
+    LOG.info("Deleted path " + path);
+  }
+  
   /*
    * Returns the relative path of the dir this cache will be localized in
    * relative path that this cache will be localized in. For
@@ -305,6 +354,51 @@
 
     return fileSystem.getFileStatus(filePath).getModificationTime();
   }
+  
+  /**
+   * Returns a boolean to denote whether a cache file is visible to all(public)
+   * or not
+   * @param conf
+   * @param uri
+   * @return true if the path in the uri is visible to all, false otherwise
+   * @throws IOException
+   */
+  static boolean isPublic(Configuration conf, URI uri) throws IOException {
+    FileSystem fs = FileSystem.get(uri, conf);
+    Path current = new Path(uri.getPath());
+    //the leaf level file should be readable by others
+    if (!checkPermissionOfOther(fs, current, FsAction.READ)) {
+      return false;
+    }
+    current = current.getParent();
+    while (current != null) {
+      //the subdirs in the path should have execute permissions for others
+      if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE)) {
+        return false;
+      }
+      current = current.getParent();
+    }
+    return true;
+  }
+  /**
+   * Checks for a given path whether the Other permissions on it 
+   * imply the permission in the passed FsAction
+   * @param fs
+   * @param path
+   * @param action
+   * @return true if the path in the uri is visible to all, false otherwise
+   * @throws IOException
+   */
+  private static boolean checkPermissionOfOther(FileSystem fs, Path path,
+      FsAction action) throws IOException {
+    FileStatus status = fs.getFileStatus(path);
+    FsPermission perms = status.getPermission();
+    FsAction otherAction = perms.getOtherAction();
+    if (otherAction.implies(action)) {
+      return true;
+    }
+    return false;
+  }
 
   private Path checkCacheStatusValidity(Configuration conf,
       URI cache, long confFileStamp,
@@ -316,13 +410,13 @@
     // Has to be 
     if (!ifExistsAndFresh(conf, fs, cache, confFileStamp,
                           cacheStatus, fileStatus)) {
-      throw new IOException("Stale cache file: " + cacheStatus.localLoadPath + 
+      throw new IOException("Stale cache file: " + cacheStatus.localizedLoadPath + 
                             " for cache-file: " + cache);
     }
 
     LOG.info(String.format("Using existing cache of %s->%s",
-        cache.toString(), cacheStatus.localLoadPath));
-    return cacheStatus.localLoadPath;
+        cache.toString(), cacheStatus.localizedLoadPath));
+    return cacheStatus.localizedLoadPath;
   }
   
   private void createSymlink(Configuration conf, URI cache,
@@ -337,7 +431,7 @@
     File flink = new File(link);
     if (doSymlink){
       if (!flink.exists()) {
-        FileUtil.symLink(cacheStatus.localLoadPath.toString(), link);
+        FileUtil.symLink(cacheStatus.localizedLoadPath.toString(), link);
       }
     }
   }
@@ -348,21 +442,21 @@
                                     URI cache, long confFileStamp,
                                     CacheStatus cacheStatus,
                                     FileStatus fileStatus,
-                                    boolean isArchive)
+                                    boolean isArchive, boolean isPublic)
   throws IOException {
     FileSystem fs = FileSystem.get(cache, conf);
     FileSystem localFs = FileSystem.getLocal(conf);
     Path parchive = null;
     if (isArchive) {
-      parchive = new Path(cacheStatus.localLoadPath,
-        new Path(cacheStatus.localLoadPath.getName()));
+      parchive = new Path(cacheStatus.localizedLoadPath,
+        new Path(cacheStatus.localizedLoadPath.getName()));
     } else {
-      parchive = cacheStatus.localLoadPath;
+      parchive = cacheStatus.localizedLoadPath;
     }
 
     if (!localFs.mkdirs(parchive.getParent())) {
       throw new IOException("Mkdirs failed to create directory " +
-          cacheStatus.localLoadPath.toString());
+          cacheStatus.localizedLoadPath.toString());
     }
 
     String cacheId = cache.getPath();
@@ -392,29 +486,45 @@
       FileUtil.getDU(new File(parchive.getParent().toString()));
     cacheStatus.size = cacheSize;
     synchronized (baseDirSize) {
-      Long dirSize = baseDirSize.get(cacheStatus.baseDir);
+      Long dirSize = baseDirSize.get(cacheStatus.localizedBaseDir);
       if( dirSize == null ) {
         dirSize = Long.valueOf(cacheSize);
       } else {
         dirSize += cacheSize;
       }
-      baseDirSize.put(cacheStatus.baseDir, dirSize);
+      baseDirSize.put(cacheStatus.localizedBaseDir, dirSize);
     }
 
-    // do chmod here
-    try {
-      //Setting recursive permission to grant everyone read and execute
-      FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
-    } catch(InterruptedException e) {
-      LOG.warn("Exception in chmod" + e.toString());
-    }
+    // set proper permissions for the localized directory
+    setPermissions(conf, cacheStatus, isPublic);
 
     // update cacheStatus to reflect the newly cached file
     cacheStatus.mtime = getTimestamp(conf, cache);
 
     LOG.info(String.format("Cached %s as %s",
-             cache.toString(), cacheStatus.localLoadPath));
-    return cacheStatus.localLoadPath;
+             cache.toString(), cacheStatus.localizedLoadPath));
+    return cacheStatus.localizedLoadPath;
+  }
+
+  private void setPermissions(Configuration conf, CacheStatus cacheStatus,
+      boolean isPublic) throws IOException {
+    if (isPublic) {
+      Path localizedUniqueDir = cacheStatus.getLocalizedUniqueDir();
+      LOG.info("Doing chmod on localdir :" + localizedUniqueDir);
+      try {
+        FileUtil.chmod(localizedUniqueDir.toString(), "ugo+rx", true);
+      } catch (InterruptedException e) {
+        LOG.warn("Exception in chmod" + e.toString());
+        throw new IOException(e);
+      }
+    } else {
+      // invoke taskcontroller to set permissions
+      DistributedCacheFileContext context = new DistributedCacheFileContext(
+          conf.get(JobContext.USER_NAME), new File(cacheStatus.localizedBaseDir
+              .toString()), cacheStatus.localizedBaseDir,
+          cacheStatus.uniqueString);
+      taskController.initializeDistributedCacheFile(context);
+    }
   }
 
   private static boolean isTarFile(String filename) {
@@ -485,10 +595,10 @@
 
   static class CacheStatus {
     // the local load path of this cache
-    Path localLoadPath;
+    Path localizedLoadPath;
 
     //the base dir where the cache lies
-    Path baseDir;
+    Path localizedBaseDir;
 
     //the size of this cache
     long size;
@@ -501,18 +611,28 @@
 
     // is it initialized ?
     boolean inited = false;
+
+    // The sub directory (tasktracker/archive or tasktracker/user/archive),
+    // under which the file will be localized
+    Path subDir;
     
-    public CacheStatus(Path baseDir, Path localLoadPath) {
+    // unique string used in the construction of local load path
+    String uniqueString;
+
+    public CacheStatus(Path baseDir, Path localLoadPath, Path subDir,
+        String uniqueString) {
       super();
-      this.localLoadPath = localLoadPath;
+      this.localizedLoadPath = localLoadPath;
       this.refcount = 0;
       this.mtime = -1;
-      this.baseDir = baseDir;
+      this.localizedBaseDir = baseDir;
       this.size = 0;
+      this.subDir = subDir;
+      this.uniqueString = uniqueString;
     }
     
     Path getBaseDir(){
-      return this.baseDir;
+      return this.localizedBaseDir;
     }
     
     // mark it as initialized
@@ -524,6 +644,10 @@
     boolean isInited() {
       return inited;
     }
+    
+    Path getLocalizedUniqueDir() {
+      return new Path(localizedBaseDir, new Path(subDir, uniqueString));
+    }
   }
 
   /**
@@ -535,7 +659,7 @@
     synchronized (cachedArchives) {
       for (Map.Entry<String,CacheStatus> f: cachedArchives.entrySet()) {
         try {
-          localFs.delete(f.getValue().localLoadPath, true);
+          deleteLocalPath(asyncDiskService, localFs, f.getValue().localizedLoadPath);
         } catch (IOException ie) {
           LOG.debug("Error cleaning up cache", ie);
         }
@@ -585,8 +709,60 @@
       setFileTimestamps(job, fileTimestamps.toString());
     }
   }
+  /**
+   * Determines the visibilities of the distributed cache files and 
+   * archives. The visibility of a cache path is "public" if the leaf component
+   * has READ permissions for others, and the parent subdirs have 
+   * EXECUTE permissions for others
+   * @param job
+   * @throws IOException
+   */
+  public static void determineCacheVisibilities(Configuration job) 
+  throws IOException {
+    URI[] tarchives = DistributedCache.getCacheArchives(job);
+    if (tarchives != null) {
+      StringBuffer archiveVisibilities = 
+        new StringBuffer(String.valueOf(isPublic(job, tarchives[0])));
+      for (int i = 1; i < tarchives.length; i++) {
+        archiveVisibilities.append(",");
+        archiveVisibilities.append(String.valueOf(isPublic(job, tarchives[i])));
+      }
+      setArchiveVisibilities(job, archiveVisibilities.toString());
+    }
+    URI[] tfiles = DistributedCache.getCacheFiles(job);
+    if (tfiles != null) {
+      StringBuffer fileVisibilities = 
+        new StringBuffer(String.valueOf(isPublic(job, tfiles[0])));
+      for (int i = 1; i < tfiles.length; i++) {
+        fileVisibilities.append(",");
+        fileVisibilities.append(String.valueOf(isPublic(job, tfiles[i])));
+      }
+      setFileVisibilities(job, fileVisibilities.toString());
+    }
+  }
   
   /**
+   * Get the booleans on whether the files are public or not.  Used by 
+   * internal DistributedCache and MapReduce code.
+   * @param conf The configuration which stored the timestamps
+   * @return a string array of booleans 
+   * @throws IOException
+   */
+  static String[] getFileVisibilities(Configuration conf) {
+    return conf.getStrings(JobContext.CACHE_FILE_VISIBILITIES);
+  }
+
+  /**
+   * Get the booleans on whether the archives are public or not.  Used by 
+   * internal DistributedCache and MapReduce code.
+   * @param conf The configuration which stored the timestamps
+   * @return a string array of booleans 
+   */
+  static String[] getArchiveVisibilities(Configuration conf) {
+    return conf.getStrings(JobContext.CACHE_ARCHIVES_VISIBILITIES);
+  }
+
+  /**
    * This method checks if there is a conflict in the fragment names 
    * of the uris. Also makes sure that each uri has a fragment. It 
    * is only to be called if you want to create symlinks for 
@@ -631,6 +807,28 @@
     }
     return true;
   }
+  /**
+   * This is to check the public/private visibility of the archives to be
+   * localized.
+   * 
+   * @param conf Configuration which stores the timestamp's
+   * @param booleans comma separated list of booleans (true - public)
+   * The order should be the same as the order in which the archives are added.
+   */
+  static void setArchiveVisibilities(Configuration conf, String booleans) {
+    conf.set(JobContext.CACHE_ARCHIVES_VISIBILITIES, booleans);
+  }
+
+  /**
+   * This is to check the public/private visibility of the files to be localized
+   * 
+   * @param conf Configuration which stores the timestamp's
+   * @param booleans comma separated list of booleans (true - public)
+   * The order should be the same as the order in which the files are added.
+   */
+  static void setFileVisibilities(Configuration conf, String booleans) {
+    conf.set(JobContext.CACHE_FILE_VISIBILITIES, booleans);
+  }
 
   /**
    * This is to check the timestamp of the archives to be localized.

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java Tue Jan 26 14:02:53 2010
@@ -24,8 +24,11 @@
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -80,6 +83,18 @@
 
   private HistoryCleaner historyCleanerThread = null;
 
+  private Map<JobID, MovedFileInfo> jobHistoryFileMap = 
+    Collections.<JobID,MovedFileInfo>synchronizedMap(
+        new LinkedHashMap<JobID, MovedFileInfo>());
+
+  private static class MovedFileInfo {
+    private final String historyFile;
+    private final long timestamp;
+    public MovedFileInfo(String historyFile, long timestamp) {
+      this.historyFile = historyFile;
+      this.timestamp = timestamp;
+    }
+  }
   /**
    * Initialize Job History Module
    * @param jt Job Tracker handle
@@ -196,6 +211,16 @@
   }
 
   /**
+   * Given the job id, return the history file path from the cache
+   */
+  public String getHistoryFilePath(JobID jobId) {
+    MovedFileInfo info = jobHistoryFileMap.get(jobId);
+    if (info == null) {
+      return null;
+    }
+    return info.historyFile;
+  }
+  /**
    * Create an event writer for the Job represented by the jobID.
    * This should be the first call to history for a job
    * @param jobId
@@ -383,7 +408,8 @@
           historyFileDonePath = new Path(done, 
               historyFile.getName()).toString();
         }
-        
+        jobHistoryFileMap.put(id, new MovedFileInfo(historyFileDonePath, 
+            System.currentTimeMillis()));
         jobTracker.retireJob(org.apache.hadoop.mapred.JobID.downgrade(id),
             historyFileDonePath);
 
@@ -481,6 +507,21 @@
             }
           }
         }
+        //walking over the map to purge entries from jobHistoryFileMap
+        synchronized (jobHistoryFileMap) {
+          Iterator<Entry<JobID, MovedFileInfo>> it = 
+            jobHistoryFileMap.entrySet().iterator();
+          while (it.hasNext()) {
+            MovedFileInfo info = it.next().getValue();
+            if (now - info.timestamp > maxAgeOfHistoryFiles) {
+              it.remove();
+            } else {
+              //since entries are in sorted timestamp order, no more entries
+              //are required to be checked
+              break;
+            }
+          }
+        }
       } catch (IOException ie) {
         LOG.info("Error cleaning up history directory" + 
             StringUtils.stringifyException(ie));

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java Tue Jan 26 14:02:53 2010
@@ -23,18 +23,25 @@
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.TimeZone;
+import java.lang.reflect.Method;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * A RecordReader that reads records from an Oracle SQL table.
  */
 public class OracleDBRecordReader<T extends DBWritable> extends DBRecordReader<T> {
 
+  private static final Log LOG = LogFactory.getLog(OracleDBRecordReader.class);
+
   public OracleDBRecordReader(DBInputFormat.DBInputSplit split, 
       Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
       String cond, String [] fields, String table) throws SQLException {
     super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
+    setSessionTimeZone(conn);
   }
 
   /** Returns the query for selecting the records from an Oracle DB. */
@@ -86,4 +93,43 @@
 
     return query.toString();
   }
+
+  /**
+   * Set session time zone
+   * @param conn      Connection object
+   * @throws          SQLException instance
+   */
+  private void setSessionTimeZone(Connection conn) throws SQLException {
+    // need to use reflection to call the method setSessionTimeZone on the OracleConnection class
+    // because oracle specific java libraries are not accessible in this context
+    Method method;
+    try {
+      method = conn.getClass().getMethod(
+              "setSessionTimeZone", new Class [] {String.class});
+    } catch (Exception ex) {
+      LOG.error("Could not find method setSessionTimeZone in " + conn.getClass().getName(), ex);
+      // rethrow SQLException
+      throw new SQLException(ex);
+    }
+
+    // Need to set the time zone in order for Java
+    // to correctly access the column "TIMESTAMP WITH LOCAL TIME ZONE"
+    String clientTimeZone = TimeZone.getDefault().getID();
+    try {
+      method.setAccessible(true);
+      method.invoke(conn, clientTimeZone);
+      LOG.info("Time zone has been set");
+    } catch (Exception ex) {
+      LOG.warn("Time zone " + clientTimeZone +
+               " could not be set on oracle database.");
+      LOG.info("Setting default time zone: UTC");
+      try {
+        method.invoke(conn, "UTC");
+      } catch (Exception ex2) {
+        LOG.error("Could not set time zone for oracle connection", ex2);
+        // rethrow SQLException
+        throw new SQLException(ex);
+      }
+    }
+  }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java Tue Jan 26 14:02:53 2010
@@ -34,7 +34,8 @@
  * 
  * The field separator is under attribute "mapreduce.fieldsel.data.field.separator"
  * 
- * The map output field list spec is under attribute "mapreduce.fieldsel.mapreduce.fieldsel.map.output.key.value.fields.spec".
+ * The map output field list spec is under attribute 
+ * "mapreduce.fieldsel.map.output.key.value.fields.spec".
  * The value is expected to be like "keyFieldsSpec:valueFieldsSpec"
  * key/valueFieldsSpec are comma (,) separated field spec: fieldSpec,fieldSpec,fieldSpec ...
  * Each field spec can be a simple number (e.g. 5) specifying a specific field, or a range
@@ -45,7 +46,8 @@
  * Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields 4,3,0 and 1 for keys,
  * and use fields 6,5,1,2,3,7 and above for values.
  * 
- * The reduce output field list spec is under attribute "mapreduce.fieldsel.mapreduce.fieldsel.reduce.output.key.value.fields.spec".
+ * The reduce output field list spec is under attribute 
+ * "mapreduce.fieldsel.reduce.output.key.value.fields.spec".
  * 
  * The reducer extracts output key/value pairs in a similar manner, except that
  * the key is never ignored.
@@ -57,9 +59,9 @@
   public static final String DATA_FIELD_SEPERATOR = 
     "mapreduce.fieldsel.data.field.separator";
   public static final String MAP_OUTPUT_KEY_VALUE_SPEC = 
-    "mapreduce.fieldsel.mapreduce.fieldsel.mapreduce.fieldsel.map.output.key.value.fields.spec";
+    "mapreduce.fieldsel.map.output.key.value.fields.spec";
   public static final String REDUCE_OUTPUT_KEY_VALUE_SPEC = 
-    "mapreduce.fieldsel.mapreduce.fieldsel.mapreduce.fieldsel.reduce.output.key.value.fields.spec";
+    "mapreduce.fieldsel.reduce.output.key.value.fields.spec";
 
 
   /**

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java Tue Jan 26 14:02:53 2010
@@ -42,7 +42,8 @@
  * The field separator is under attribute "mapreduce.fieldsel.data.field.separator"
  * 
  * The map output field list spec is under attribute 
- * "mapreduce.fieldsel.mapreduce.fieldsel.map.output.key.value.fields.spec". The value is expected to be like
+ * "mapreduce.fieldsel.map.output.key.value.fields.spec". 
+ * The value is expected to be like
  * "keyFieldsSpec:valueFieldsSpec" key/valueFieldsSpec are comma (,) separated
  * field spec: fieldSpec,fieldSpec,fieldSpec ... Each field spec can be a 
  * simple number (e.g. 5) specifying a specific field, or a range (like 2-5)

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java Tue Jan 26 14:02:53 2010
@@ -41,7 +41,8 @@
  * The field separator is under attribute "mapreduce.fieldsel.data.field.separator"
  * 
  * The reduce output field list spec is under attribute 
- * "mapreduce.fieldsel.mapreduce.fieldsel.reduce.output.key.value.fields.spec". The value is expected to be like
+ * "mapreduce.fieldsel.reduce.output.key.value.fields.spec". 
+ * The value is expected to be like
  * "keyFieldsSpec:valueFieldsSpec" key/valueFieldsSpec are comma (,) 
  * separated field spec: fieldSpec,fieldSpec,fieldSpec ... Each field spec
  * can be a simple number (e.g. 5) specifying a specific field, or a range

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Tue Jan 26 14:02:53 2010
@@ -238,6 +238,14 @@
     return result;
   }
   
+  /**
+   * A factory that makes the split for this class. It can be overridden
+   * by sub-classes to make sub-types
+   */
+  protected FileSplit makeSplit(Path file, long start, long length, 
+                                String[] hosts) {
+    return new FileSplit(file, start, length, hosts);
+  }
 
   /** 
    * Generate the list of files and make them into FileSplits.
@@ -261,20 +269,20 @@
         long bytesRemaining = length;
         while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
           int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
-          splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
+          splits.add(makeSplit(path, length-bytesRemaining, splitSize, 
                                    blkLocations[blkIndex].getHosts()));
           bytesRemaining -= splitSize;
         }
         
         if (bytesRemaining != 0) {
-          splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 
+          splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, 
                      blkLocations[blkLocations.length-1].getHosts()));
         }
       } else if (length != 0) {
-        splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
+        splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
       } else { 
         //Create empty hosts array for zero length files
-        splits.add(new FileSplit(path, 0, length, new String[0]));
+        splits.add(makeSplit(path, 0, length, new String[0]));
       }
     }
     LOG.debug("Total # of splits: " + splits.size());

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFilter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFilter.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFilter.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFilter.java Tue Jan 26 14:02:53 2010
@@ -47,11 +47,11 @@
   public static final Log LOG = LogFactory.getLog(FileInputFormat.class);
   
   final public static String FILTER_CLASS = 
-    "mapreduce.input.mapreduce.input.mapreduce.input.sequencefileinputfilter.class";
+    "mapreduce.input.sequencefileinputfilter.class";
   final public static String FILTER_FREQUENCY = 
-    "mapreduce.input.mapreduce.input.mapreduce.input.sequencefileinputfilter.frequency";
+    "mapreduce.input.sequencefileinputfilter.frequency";
   final public static String FILTER_REGEX = 
-    "mapreduce.input.mapreduce.input.mapreduce.input.sequencefileinputfilter.regex";
+    "mapreduce.input.sequencefileinputfilter.regex";
     
   public SequenceFileInputFilter() {
   }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java Tue Jan 26 14:02:53 2010
@@ -125,7 +125,6 @@
     Deserializer deserializer = factory.getDeserializer(inputSplitClass);
     deserializer.open((DataInputStream)in);
     inputSplit = (InputSplit)deserializer.deserialize(inputSplit);
-    deserializer.close();
   }
 
   private Class<?> readClass(DataInput in) throws IOException {
@@ -147,7 +146,6 @@
           factory.getSerializer(inputSplitClass);
     serializer.open((DataOutputStream)out);
     serializer.serialize(inputSplit);
-    serializer.close();
   }
 
   public Configuration getConf() {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputSplit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputSplit.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputSplit.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputSplit.java Tue Jan 26 14:02:53 2010
@@ -128,7 +128,6 @@
         factory.getSerializer(s.getClass());
       serializer.open((DataOutputStream)out);
       serializer.serialize(s);
-      serializer.close();
     }
   }
 
@@ -155,7 +154,6 @@
         Deserializer deserializer = factory.getDeserializer(cls[i]);
         deserializer.open((DataInputStream)in);
         splits[i] = (InputSplit)deserializer.deserialize(splits[i]);
-        deserializer.close();
       }
     } catch (ClassNotFoundException e) {
       throw new IOException("Failed split init", e);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java Tue Jan 26 14:02:53 2010
@@ -20,7 +20,6 @@
 
 import java.io.IOException;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.mapreduce.ClusterMetrics;
 import org.apache.hadoop.mapreduce.Counters;
@@ -33,6 +32,7 @@
 import org.apache.hadoop.mapreduce.TaskReport;
 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.security.TokenStorage;
 import org.apache.hadoop.mapreduce.server.jobtracker.State;
 
 /** 
@@ -87,8 +87,12 @@
    * Version 28: Added getJobHistoryDir() as part of MAPREDUCE-975.
    * Version 29: Added reservedSlots, runningTasks and totalJobSubmissions
    *             to ClusterMetrics as part of MAPREDUCE-1048.
+   * Version 30: Job submission files are uploaded to a staging area under
+   *             user home dir. JobTracker reads the required files from the
+   *             staging area using user credentials passed via the rpc.
+   * Version 31: Added TokenStorage to submitJob          
    */
-  public static final long versionID = 29L;
+  public static final long versionID = 31L;
 
   /**
    * Allocate a name for the job.
@@ -100,9 +104,8 @@
   /**
    * Submit a Job for execution.  Returns the latest profile for
    * that job.
-   * The job files should be submitted in <b>system-dir</b>/<b>jobName</b>.
    */
-  public JobStatus submitJob(JobID jobName) 
+  public JobStatus submitJob(JobID jobId, String jobSubmitDir, TokenStorage ts) 
   throws IOException, InterruptedException;
 
   /**
@@ -219,7 +222,15 @@
    * 
    * @return the system directory where job-specific files are to be placed.
    */
-  public String getSystemDir() throws IOException, InterruptedException;  
+  public String getSystemDir() throws IOException, InterruptedException;
+  
+  /**
+   * Get a hint from the JobTracker 
+   * where job-specific files are to be placed.
+   * 
+   * @return the directory where job-specific files are to be placed.
+   */
+  public String getStagingAreaDir() throws IOException, InterruptedException;
 
   /**
    * Gets the directory location of the completed job history files.

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java Tue Jan 26 14:02:53 2010
@@ -22,16 +22,13 @@
 import java.io.IOException;
 import java.io.PrintStream;
 import java.net.URL;
-import java.security.InvalidKeyException;
-import java.security.NoSuchAlgorithmException;
 
-import javax.crypto.KeyGenerator;
-import javax.crypto.Mac;
-import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.SecretKey;
 import javax.servlet.http.HttpServletRequest;
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.record.Utils;
 
 /**
@@ -43,62 +40,17 @@
 public class SecureShuffleUtils {
   public static final String HTTP_HEADER_URL_HASH = "UrlHash";
   public static final String HTTP_HEADER_REPLY_URL_HASH = "ReplyHash";
-  public static KeyGenerator kg = null;
-  public static String DEFAULT_ALG="HmacSHA1";
-  
-  private SecretKeySpec secretKey;
-  private Mac mac;
-  
   /**
-   * static generate keys
-   * @return new encoded key
-   * @throws NoSuchAlgorithmException
+   * file name used on HDFS for generated job token
    */
-  public static byte[] getNewEncodedKey() throws NoSuchAlgorithmException{
-    SecretKeySpec key = generateKey(DEFAULT_ALG);
-    return key.getEncoded();
-  }
-  
-  private static SecretKeySpec generateKey(String alg) throws NoSuchAlgorithmException {
-    if(kg==null) {
-      kg = KeyGenerator.getInstance(alg);
-    }
-    return (SecretKeySpec) kg.generateKey();
-  }
-
-  /**
-   * Create a util object with alg and key
-   * @param sKeyEncoded
-   * @throws NoSuchAlgorithmException
-   * @throws InvalidKeyException
-   */
-  public SecureShuffleUtils(byte [] sKeyEncoded) 
-  throws  IOException{
-    secretKey = new SecretKeySpec(sKeyEncoded, SecureShuffleUtils.DEFAULT_ALG);
-    try {
-      mac = Mac.getInstance(DEFAULT_ALG);
-      mac.init(secretKey);
-    } catch (NoSuchAlgorithmException nae) {
-      throw new IOException(nae);
-    } catch( InvalidKeyException ie) {
-      throw new IOException(ie);
-    }
-  }
-  
-  /** 
-   * get key as byte[]
-   * @return encoded key
-   */
-  public byte [] getEncodedKey() {
-    return secretKey.getEncoded();
-  }
+  public static final String JOB_TOKEN_FILENAME = "jobToken";
   
   /**
    * Base64 encoded hash of msg
    * @param msg
    */
-  public String generateHash(byte[] msg) {
-    return new String(Base64.encodeBase64(generateByteHash(msg)));
+  public static String generateHash(byte[] msg, SecretKey key) {
+    return new String(Base64.encodeBase64(generateByteHash(msg, key)));
   }
   
   /**
@@ -106,8 +58,8 @@
    * @param msg
    * @return
    */
-  private byte[] generateByteHash(byte[] msg) {
-    return mac.doFinal(msg);
+  private static byte[] generateByteHash(byte[] msg, SecretKey key) {
+    return JobTokenSecretManager.computeHash(msg, key);
   }
   
   /**
@@ -115,20 +67,21 @@
    * @param newHash
    * @return true if is the same
    */
-  private boolean verifyHash(byte[] hash, byte[] msg) {
-    byte[] msg_hash = generateByteHash(msg);
+  private static boolean verifyHash(byte[] hash, byte[] msg, SecretKey key) {
+    byte[] msg_hash = generateByteHash(msg, key);
     return Utils.compareBytes(msg_hash, 0, msg_hash.length, hash, 0, hash.length) == 0;
   }
   
   /**
    * Aux util to calculate hash of a String
    * @param enc_str
+   * @param key
    * @return Base64 encodedHash
    * @throws IOException
    */
-  public String hashFromString(String enc_str) 
+  public static String hashFromString(String enc_str, SecretKey key) 
   throws IOException {
-    return generateHash(enc_str.getBytes()); 
+    return generateHash(enc_str.getBytes(), key); 
   }
   
   /**
@@ -137,11 +90,11 @@
    * @param msg
    * @throws IOException if not the same
    */
-  public void verifyReply(String base64Hash, String msg)
+  public static void verifyReply(String base64Hash, String msg, SecretKey key)
   throws IOException {
     byte[] hash = Base64.decodeBase64(base64Hash.getBytes());
     
-    boolean res = verifyHash(hash, msg.getBytes());
+    boolean res = verifyHash(hash, msg.getBytes(), key);
     
     if(res != true) {
       throw new IOException("Verification of the hashReply failed");

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java Tue Jan 26 14:02:53 2010
@@ -80,6 +80,8 @@
   public static final String JT_AVG_BLACKLIST_THRESHOLD = 
     "mapreduce.jobtracker.blacklist.average.threshold";
   public static final String JT_SYSTEM_DIR = "mapreduce.jobtracker.system.dir";
+  public static final String JT_STAGING_AREA_ROOT = 
+    "mapreduce.jobtracker.staging.root.dir";
   public static final String JT_MAX_TRACKER_BLACKLISTS = 
     "mapreduce.jobtracker.tasktracker.maxblacklists";
   public static final String JT_JOBHISTORY_MAXAGE = 
@@ -88,4 +90,6 @@
     "mapreduce.jobtracker.maxmapmemory.mb";
   public static final String JT_MAX_REDUCEMEMORY_MB = 
     "mapreduce.jobtracker.maxreducememory.mb";
+  public static final String MAX_JOB_SPLIT_METAINFO_SIZE = 
+  "mapreduce.job.split.metainfo.maxsize";
 }



Mime
View raw message