hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r816587 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/java/org/apache/hadoop/mapreduce/jobhistory/ src/java/org/apache/hadoop/mapreduce/protocol/ src/test/mapred/org/apache/had...
Date Fri, 18 Sep 2009 11:36:30 GMT
Author: sharad
Date: Fri Sep 18 11:36:29 2009
New Revision: 816587

URL: http://svn.apache.org/viewvc?rev=816587&view=rev
Log:
MAPREDUCE-975. Add an API in job client to get the history file url for a given job id. Contributed
by Sharad Agarwal.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Cluster.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=816587&r1=816586&r2=816587&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Sep 18 11:36:29 2009
@@ -378,6 +378,9 @@
     MAPREDUCE-781. Let the name of distcp jobs be configurable. (Venkatesh S
     via cdouglas)
 
+    MAPREDUCE-975. Add an API in job client to get the history file url for 
+    a given job id. (sharad)
+
   BUG FIXES
 
     MAPREDUCE-878. Rename fair scheduler design doc to 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=816587&r1=816586&r2=816587&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Fri Sep 18 11:36:29
2009
@@ -3502,6 +3502,14 @@
     return fs.makeQualified(sysDir).toString();
   }
   
+  /**
+   * @see 
+   * org.apache.hadoop.mapreduce.protocol.ClientProtocol#getJobHistoryDir()
+   */
+  public String getJobHistoryDir() {
+    return jobHistory.getCompletedJobHistoryLocation().toString();
+  }
+
   ///////////////////////////////////////////////////////////////
   // JobTracker methods
   ///////////////////////////////////////////////////////////////

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=816587&r1=816586&r2=816587&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Fri Sep 18
11:36:29 2009
@@ -558,6 +558,9 @@
     return fs.makeQualified(sysDir).toString();
   }
 
+  public String getJobHistoryDir() {
+    return null;
+  }
 
   @Override
   public QueueInfo[] getChildQueues(String queueName) throws IOException {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Cluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Cluster.java?rev=816587&r1=816586&r2=816587&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Cluster.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Cluster.java Fri Sep 18 11:36:29
2009
@@ -30,18 +30,22 @@
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobTracker;
 import org.apache.hadoop.mapred.LocalJobRunner;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.server.jobtracker.State;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UnixUserGroupInformation;
 
 /**
  * Provides a way to access information about the map/reduce cluster.
  */
 public class Cluster {
   private ClientProtocol client;
+  private UnixUserGroupInformation ugi;
   private Configuration conf;
   private FileSystem fs = null;
   private Path sysDir = null;
+  private Path jobHistoryDir = null;
 
   static {
     Configuration.addDefaultResource("mapred-default.xml");
@@ -50,19 +54,21 @@
   
   public Cluster(Configuration conf) throws IOException {
     this.conf = conf;
+    this.ugi = Job.getUGI(conf);
     client = createClient(conf);
   }
 
   public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
       throws IOException {
     this.conf = conf;
+    this.ugi = Job.getUGI(conf);
     client = createRPCProxy(jobTrackAddr, conf);
   }
 
   private ClientProtocol createRPCProxy(InetSocketAddress addr,
       Configuration conf) throws IOException {
     return (ClientProtocol) RPC.getProxy(ClientProtocol.class,
-      ClientProtocol.versionID, addr, Job.getUGI(conf), conf,
+      ClientProtocol.versionID, addr, ugi, conf,
       NetUtils.getSocketFactory(conf, ClientProtocol.class));
   }
 
@@ -218,6 +224,24 @@
   }
 
   /**
+   * Get the job history file path for a given job id. The job history file at 
+   * this path may or may not be existing depending on the job completion state.
+   * The file is present only for the completed jobs.
+   * @param jobId the JobID of the job submitted by the current user.
+   * @return the file path of the job history file
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public String getJobHistoryUrl(JobID jobId) throws IOException, 
+    InterruptedException {
+    if (jobHistoryDir == null) {
+      jobHistoryDir = new Path(client.getJobHistoryDir());
+    }
+    return JobHistory.getJobHistoryFile(jobHistoryDir, jobId, 
+        ugi.getUserName()).toString();
+  }
+
+  /**
    * Gets the Queue ACLs for current user
    * @return array of QueueAclsInfo object for current user.
    * @throws IOException

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java?rev=816587&r1=816586&r2=816587&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
Fri Sep 18 11:36:29 2009
@@ -173,15 +173,6 @@
     }
   }
 
-  /** Get the done directory */
-  public synchronized String getDoneJobHistoryFileName(JobConf jobConf,
-      JobID id) throws IOException {
-    if (done == null) {
-      return null;
-    }
-    return getJobHistoryFileName(jobConf, id, done, doneDirFs);
-  }
-
   /**
    * Get the history location
    */
@@ -197,46 +188,11 @@
   }
 
   /**
-   * @param dir The directory where to search.
+   * Get the job history file path
    */
-  private synchronized String getJobHistoryFileName(JobConf jobConf,
-      JobID id, Path dir, FileSystem fs)
-  throws IOException {
-    String user = getUserName(jobConf);
-    // Make the pattern matching the job's history file
-    final Pattern historyFilePattern =
-      Pattern.compile(id.toString() + "_" + user + "+");
-    // a path filter that matches the parts of the filenames namely
-    //  - job-id, user name
-    PathFilter filter = new PathFilter() {
-      public boolean accept(Path path) {
-        String fileName = path.getName();
-        return historyFilePattern.matcher(fileName).find();
-      }
-    };
-  
-    FileStatus[] statuses = fs.listStatus(dir, filter);
-    String filename = null;
-    if (statuses.length == 0) {
-      LOG.info("Nothing to recover for job " + id);
-    } else {
-      filename = statuses[0].getPath().getName();
-      LOG.info("Recovered job history filename for job " + id + " is "
-          + filename);
-    }
-    return filename;
-  }
-
-  String getNewJobHistoryFileName(JobConf conf, JobID jobId) {
-    return jobId.toString() +
-    "_" + getUserName(conf);
-  }
-
-  /**
-   * Get the job history file path given the history filename
-   */
-  private Path getJobHistoryLogLocation(String logFileName) {
-    return logDir == null ? null : new Path(logDir, logFileName);
+  public static Path getJobHistoryFile(Path dir, JobID jobId, 
+      String user) {
+    return new Path(dir, jobId.toString() + "_" + user);
   }
 
   /**
@@ -248,9 +204,7 @@
    */
   public void setupEventWriter(JobID jobId, JobConf jobConf)
   throws IOException {
-    String logFileName = getNewJobHistoryFileName(jobConf, jobId);
-  
-    Path logFile = getJobHistoryLogLocation(logFileName);
+    Path logFile = getJobHistoryFile(logDir, jobId, getUserName(jobConf));
   
     if (logDir == null) {
       LOG.info("Log Directory is null, returning");

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java?rev=816587&r1=816586&r2=816587&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
Fri Sep 18 11:36:29 2009
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.mapreduce.ClusterMetrics;
 import org.apache.hadoop.mapreduce.Counters;
@@ -82,9 +83,10 @@
    *              added new api's getRootQueues and
    *              getChildQueues(String queueName)
    * Version 27: Changed protocol to use new api objects. And the protocol is 
-   *             renamed from JobSubmissionProtocol to ClientProtocol.          
+   *             renamed from JobSubmissionProtocol to ClientProtocol.
+   * Version 28: Added getJobHistoryDir() as part of MAPREDUCE-975.
    */
-  public static final long versionID = 27L;
+  public static final long versionID = 28L;
 
   /**
    * Allocate a name for the job.
@@ -216,7 +218,15 @@
    * @return the system directory where job-specific files are to be placed.
    */
   public String getSystemDir() throws IOException, InterruptedException;  
-  
+
+  /**
+   * Gets the directory location of the completed job history files.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public String getJobHistoryDir() 
+  throws IOException, InterruptedException;
+
   /**
    * Gets set of Queues associated with the Job Tracker
    * 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java?rev=816587&r1=816586&r2=816587&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java Fri
Sep 18 11:36:29 2009
@@ -37,6 +37,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -577,7 +578,7 @@
     validateTaskAttemptLevelKeyValues(mr, job, jobInfo);
   }
 
-  public void testDoneFolderOnHDFS() throws IOException {
+  public void testDoneFolderOnHDFS() throws IOException, InterruptedException {
     MiniMRCluster mr = null;
     try {
       JobConf conf = new JobConf();
@@ -644,6 +645,10 @@
       // Framework history log file location
       Path logFile = new Path(doneDir, logFileName);
       FileSystem fileSys = logFile.getFileSystem(conf);
+
+      Cluster cluster = new Cluster(conf);
+      assertEquals("Client returned wrong history url", logFile.toString(), 
+          cluster.getJobHistoryUrl(id));
    
       // Check if the history file exists
       assertTrue("History file does not exist", fileSys.exists(logFile));
@@ -790,9 +795,14 @@
       Path doneDir) throws IOException {
     String name = null;
     for (int i = 0; name == null && i < 20; i++) {
-      name = jobHistory.getDoneJobHistoryFileName(conf, id);
+      Path path = JobHistory.getJobHistoryFile(
+          jobHistory.getCompletedJobHistoryLocation(), id, conf.getUser());
+      if (path.getFileSystem(conf).exists(path)) {
+        name = path.toString();
+      }
       UtilsForTests.waitFor(1000);
     }
+    assertNotNull("Job history file not created", name);
     return name;
   }
 



Mime
View raw message