Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 15216 invoked from network); 21 Nov 2006 20:41:02 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 21 Nov 2006 20:41:02 -0000 Received: (qmail 13347 invoked by uid 500); 21 Nov 2006 20:41:11 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 13330 invoked by uid 500); 21 Nov 2006 20:41:11 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 13321 invoked by uid 99); 21 Nov 2006 20:41:11 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Nov 2006 12:41:11 -0800 X-ASF-Spam-Status: No, hits=-8.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Nov 2006 12:41:00 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id 19C781A9846; Tue, 21 Nov 2006 12:40:26 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r477876 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/mapred/ Date: Tue, 21 Nov 2006 20:40:25 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20061121204026.19C781A9846@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cutting Date: Tue Nov 21 12:40:24 2006 New Revision: 477876 URL: http://svn.apache.org/viewvc?view=rev&rev=477876 Log: HADOOP-76. Implement speculative reduce. Contributed by Sanjay. Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=477876&r1=477875&r2=477876 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Tue Nov 21 12:40:24 2006 @@ -116,6 +116,12 @@ for sorting datanode list by various columns. (Raghu Angadi via cutting) +35. HADOOP-76. Implement speculative reduce. Now when a job is + configured for speculative execution, both maps and reduces will + execute speculatively. Reduce outputs are written to temporary + location and moved to the final location when reduce is complete. + (Sanjay Dahiya via cutting) + Release 0.8.0 - 2006-11-03 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java?view=diff&rev=477876&r1=477875&r2=477876 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java Tue Nov 21 12:40:24 2006 @@ -154,12 +154,12 @@ FileSplit split = new FileSplit(new Path(conf.get("map.input.file")), conf.getLong("map.input.start", 0), conf.getLong("map.input.length", 0)); - task = new MapTask(jobId, jobFilename.toString(), taskId, partition, - split); + task = new MapTask(jobId, jobFilename.toString(), conf.get("mapred.tip.id"), + taskId, partition, split); } else { int numMaps = conf.getNumMapTasks(); fillInMissingMapOutputs(local, taskId, numMaps, conf); - task = new ReduceTask(jobId, jobFilename.toString(), taskId, + task = new ReduceTask(jobId, jobFilename.toString(), conf.get("mapred.tip.id"), taskId, partition, numMaps); } task.setConf(conf); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=477876&r1=477875&r2=477876 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Nov 21 12:40:24 2006 @@ -357,7 +357,7 @@ return null; } - double avgProgress = status.reduceProgress() / reduces.length; + double avgProgress = status.reduceProgress() ; int target = findNewTask(tts, clusterSize, avgProgress, reduces, null); if (target == -1) { @@ -438,8 +438,10 @@ LOG.info("Choosing normal task " + tasks[i].getTIPId()); return i; } else if (specTarget == -1 && - task.hasSpeculativeTask(avgProgress)) { + task.hasSpeculativeTask(avgProgress) && + ! task.hasRunOnMachine(taskTracker)) { specTarget = i; + break ; } } } @@ -691,6 +693,11 @@ // so we remove that directory to cleanup FileSystem fs = FileSystem.get(conf); fs.delete(new Path(profile.getJobFile()).getParent()); + + // Delete temp dfs dirs created if any, like in case of + // speculative exn of reduces. + // String tempDir = conf.get("mapred.system.dir") + "/job_" + uniqueString; + // fs.delete(new Path(tempDir)); } catch (IOException e) { LOG.warn("Error cleaning up "+profile.getJobId()+": "+e); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=477876&r1=477875&r2=477876 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Nov 21 12:40:24 2006 @@ -96,8 +96,10 @@ // run a map task for each split job.setNumReduceTasks(1); // force a single reduce task for (int i = 0; i < splits.length; i++) { - mapIds.add("map_" + newId()); - MapTask map = new MapTask(jobId, file, (String)mapIds.get(i), i, + String mapId = "map_" + newId() ; + mapIds.add(mapId); + MapTask map = new MapTask(jobId, file, "tip_m_" + mapId, + mapId, i, splits[i]); JobConf localConf = new JobConf(job); map.localizeConfiguration(localConf); @@ -126,7 +128,7 @@ { ReduceTask reduce = new ReduceTask(jobId, file, - reduceId, 0, mapIds.size()); + "tip_r_0001", reduceId, 0, mapIds.size()); JobConf localConf = new JobConf(job); reduce.localizeConfiguration(localConf); reduce.setConf(localConf); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=477876&r1=477875&r2=477876 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Tue Nov 21 12:40:24 2006 @@ -85,9 +85,9 @@ public MapTask() {} - public MapTask(String jobId, String jobFile, String taskId, + public MapTask(String jobId, String jobFile, String tipId, String taskId, int partition, FileSplit split) { - super(jobId, jobFile, taskId, partition); + super(jobId, jobFile, tipId, taskId, partition); this.split = split; myMetrics = new MapTaskMetrics(taskId); } Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java?view=auto&rev=477876 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java Tue Nov 21 12:40:24 2006 @@ -0,0 +1,464 @@ +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FSOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Progressable; + +/** + * This class acts as a proxy to the actual file system being used. + * It writes files to a temporary location and on + * commit, moves to final location. On abort or a failure in + * commit the temporary file is deleted + * PhasedFileSystem works in context of a task. A different instance of + * PhasedFileSystem should be used for every task. + * Temporary files are written in ("mapred.system.dir")// + * If one tasks opens a large number of files in succession then its + * better to commit(Path) individual files when done. Otherwise + * commit() can be used to commit all open files at once. + */ +class PhasedFileSystem extends FileSystem { + + private FileSystem baseFS ; + // Map from final file name to temporary file name + private Map finalNameToFileInfo = new HashMap(); + + private String jobid ; + private String tipid ; + private String taskid ; + + private Path tempDir ; + /** + * This Constructor is used to wrap a FileSystem object to a + * Phased FilsSystem. + * @param fs base file system object + * @param jobid JobId + * @param tipid tipId + * @param taskid taskId + */ + public PhasedFileSystem(FileSystem fs, String jobid, + String tipid, String taskid) { + super(fs.getConf()); // not used + + this.baseFS = fs ; + this.jobid = jobid; + this.tipid = tipid ; + this.taskid = taskid ; + + tempDir = new Path(baseFS.getConf().get("mapred.system.dir") ); + } + /** + * This Constructor is used to wrap a FileSystem object to a + * Phased FilsSystem. + * @param fs base file system object + * @param conf JobConf + */ + public PhasedFileSystem(FileSystem fs, JobConf conf) { + super(fs.getConf()); // not used + + this.baseFS = fs ; + this.jobid = conf.get("mapred.job.id"); + this.tipid = conf.get("mapred.tip.id"); + this.taskid = conf.get("mapred.task.id") ; + + tempDir = new Path(baseFS.getConf().get("mapred.system.dir") ); + } + /** + * This Constructor should not be used in this or any derived class. + * @param conf + */ + protected PhasedFileSystem(Configuration conf){ + super(conf); + throw new UnsupportedOperationException("Operation not supported"); + } + + private Path setupFile(Path finalFile, boolean overwrite) throws IOException{ + if( finalNameToFileInfo.containsKey(finalFile) ){ + if( !overwrite ){ + throw new IOException("Error, file already exists : " + + finalFile.toString()); + }else{ + // delete tempp file and let create a new one. + FileInfo fInfo = finalNameToFileInfo.get(finalFile); + try{ + fInfo.getOpenFileStream().close(); + }catch(IOException ioe){ + // ignore if already closed + } + baseFS.delete( fInfo.getTempPath() ); + finalNameToFileInfo.remove(finalFile); + } + } + + String uniqueFile = jobid + "/" + tipid + "/" + taskid + "/" + finalFile.getName(); + + Path tempPath = new Path(tempDir, new Path(uniqueFile)); + FileInfo fInfo = new FileInfo(tempPath, finalFile, overwrite); + + finalNameToFileInfo.put(finalFile, fInfo); + + return tempPath ; + } + + @Override + public FSOutputStream createRaw( + Path f, boolean overwrite, short replication, long blockSize) + throws IOException { + + // for reduce output its checked in job client but lets check it anyways + // as tasks with side effect may write to locations not set in jobconf + // as output path. + if( baseFS.exists(f) && !overwrite ){ + throw new IOException("Error creating file - already exists : " + f); + } + FSOutputStream stream = + baseFS.createRaw(setupFile(f, overwrite), overwrite, replication, blockSize); + finalNameToFileInfo.get(f).setOpenFileStream(stream); + return stream; + } + + @Override + public FSOutputStream createRaw( + Path f, boolean overwrite, short replication, long blockSize, + Progressable progress) + throws IOException { + if( baseFS.exists(f) && !overwrite ){ + throw new IOException("Error creating file - already exists : " + f); + } + FSOutputStream stream = + baseFS.createRaw(setupFile(f, overwrite), overwrite, replication, + blockSize, progress); + finalNameToFileInfo.get(f).setOpenFileStream(stream); + return stream ; + } + /** + * Commits a single file file to its final locations as passed in create* methods. + * If a file already exists in final location then temporary file is deleted. + * @param fPath path to final file. + * @throws IOException thrown if commit fails + */ + public void commit(Path fPath) throws IOException{ + commit(fPath, true); + } + + // use extra method arg to avoid concurrentModificationException + // if committing using this method while iterating. + private void commit(Path fPath , boolean removeFromMap)throws IOException{ + FileInfo fInfo = finalNameToFileInfo.get(fPath) ; + if( null == fInfo ){ + throw new IOException("Error committing file! File was not created " + + "with PhasedFileSystem : " + fPath); + } + try{ + fInfo.getOpenFileStream().close(); + }catch(IOException ioe){ + // ignore if already closed + ioe.printStackTrace(); + } + Path tempPath = fInfo.getTempPath(); + // ignore .crc files + if(! tempPath.toString().endsWith(".crc")){ + if( !baseFS.exists(fPath) || fInfo.isOverwrite()){ + if(! baseFS.exists(fPath.getParent())){ + baseFS.mkdirs(fPath.getParent()); + } + + if( baseFS.exists(fPath) && fInfo.isOverwrite()){ + baseFS.delete(fPath); + } + + try { + if( ! baseFS.rename(fInfo.getTempPath(), fPath) ){ + // delete the temp file if rename failed + baseFS.delete(fInfo.getTempPath()); + } + }catch(IOException ioe){ + // rename failed, log error and delete temp files + LOG.error("PhasedFileSystem failed to commit file : " + fPath + + " error : " + ioe.getMessage()); + baseFS.delete(fInfo.getTempPath()); + } + }else{ + // delete temp file + baseFS.delete(fInfo.getTempPath()); + } + // done with the file + if( removeFromMap ){ + finalNameToFileInfo.remove(fPath); + } + } + } + + /** + * Commits files to their final locations as passed in create* methods. + * If a file already exists in final location then temporary file is deleted. + * This methods ignores crc files (ending with .crc). This method doesnt close + * the file system so it can still be used to create new files. + * @throws IOException if any file fails to commit + */ + public void commit() throws IOException { + for( Path fPath : finalNameToFileInfo.keySet()){ + commit(fPath, false); + } + // safe to clear map now + finalNameToFileInfo.clear(); + } + /** + * Aborts a single file. The temporary created file is deleted. + * @param p the path to final file as passed in create* call + * @throws IOException if File delete fails + */ + public void abort(Path p)throws IOException{ + abort(p, true); + } + + // use extra method arg to avoid concurrentModificationException + // if aborting using this method while iterating. + private void abort(Path p, boolean removeFromMap) throws IOException{ + FileInfo fInfo = finalNameToFileInfo.get(p); + if( null != fInfo ){ + try{ + fInfo.getOpenFileStream().close(); + }catch(IOException ioe){ + // ignore if already closed + } + baseFS.delete(fInfo.getTempPath()); + if( removeFromMap ){ + finalNameToFileInfo.remove(p); + } + } + } + /** + * Aborts the file creation, all uncommitted files created by this PhasedFileSystem + * instance are deleted. This does not close baseFS because handle to baseFS may still + * exist can be used to create new files. + * + * @throws IOException + */ + public void abort() throws IOException { + for(Path fPath : finalNameToFileInfo.keySet() ){ + abort(fPath, false); + } + // safe to clean now + finalNameToFileInfo.clear(); + } + /** + * Closes base file system. + */ + public void close() throws IOException { + baseFS.close(); + } + + @Override + public short getReplication( + Path src) + throws IOException { + // keep replication same for temp file as for + // final file. + return baseFS.getReplication(src); + } + + @Override + public boolean setReplicationRaw( + Path src, short replication) + throws IOException { + // throw IOException for interface compatibility with + // base class. + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override + public boolean renameRaw( + Path src, Path dst) + throws IOException { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override + public boolean deleteRaw( + Path f) + throws IOException { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override + public boolean exists(Path f) + throws IOException { + return baseFS.exists(f); + } + + @Override + public boolean isDirectory(Path f) + throws IOException { + return baseFS.isDirectory(f); + } + + @Override + public long getLength(Path f) + throws IOException { + return baseFS.getLength(f); + } + + @Override + public Path[] listPathsRaw(Path f) + throws IOException { + return baseFS.listPathsRaw(f); + } + + @Override + public void setWorkingDirectory(Path new_dir) { + baseFS.setWorkingDirectory(new_dir); + } + + @Override + public Path getWorkingDirectory() { + return baseFS.getWorkingDirectory(); + } + + @Override + public boolean mkdirs(Path f) + throws IOException { + return baseFS.mkdirs(f) ; + } + + @Override + public void lock( + Path f, boolean shared) + throws IOException { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override + public void release( + Path f) + throws IOException { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override + public void copyFromLocalFile( + Path src, Path dst) + throws IOException { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override + public void moveFromLocalFile( + Path src, Path dst) + throws IOException { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override + public void copyToLocalFile( + Path src, Path dst) + throws IOException { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override + public Path startLocalOutput( + Path fsOutputFile, Path tmpLocalFile) + throws IOException { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override + public void completeLocalOutput( + Path fsOutputFile, Path tmpLocalFile) + throws IOException { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override + public void reportChecksumFailure( + Path f, FSInputStream in, long start, long length, int crc) { + baseFS.reportChecksumFailure(f, in, start, length, crc); + } + + @Override + public long getBlockSize( + Path f) + throws IOException { + return baseFS.getBlockSize(f); + } + + @Override + public long getDefaultBlockSize() { + return baseFS.getDefaultBlockSize(); + } + + @Override + public short getDefaultReplication() { + return baseFS.getDefaultReplication(); + } + + @Override + public String[][] getFileCacheHints( + Path f, long start, long len) + throws IOException { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override + public String getName() { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override + public FSInputStream openRaw(Path f) + throws IOException { + return baseFS.openRaw(f); + } + + private class FileInfo { + private Path tempPath ; + private Path finalPath ; + private FSOutputStream openFileStream ; + private boolean overwrite ; + + FileInfo(Path tempPath, Path finalPath, boolean overwrite){ + this.tempPath = tempPath ; + this.finalPath = finalPath ; + this.overwrite = overwrite; + } + public FSOutputStream getOpenFileStream() { + return openFileStream; + } + public void setOpenFileStream( + FSOutputStream openFileStream) { + this.openFileStream = openFileStream; + } + public Path getFinalPath() { + return finalPath; + } + public void setFinalPath( + Path finalPath) { + this.finalPath = finalPath; + } + public boolean isOverwrite() { + return overwrite; + } + public void setOverwrite( + boolean overwrite) { + this.overwrite = overwrite; + } + public Path getTempPath() { + return tempPath; + } + public void setTempPath( + Path tempPath) { + this.tempPath = tempPath; + } + + } + +} Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=477876&r1=477875&r2=477876 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Tue Nov 21 12:40:24 2006 @@ -80,9 +80,9 @@ public ReduceTask() {} - public ReduceTask(String jobId, String jobFile, String taskId, + public ReduceTask(String jobId, String jobFile, String tipId, String taskId, int partition, int numMaps) { - super(jobId, jobFile, taskId, partition); + super(jobId, jobFile, tipId, taskId, partition); this.numMaps = numMaps; myMetrics = new ReduceTaskMetrics(taskId); } @@ -272,9 +272,18 @@ Reporter reporter = getReporter(umbilical, getProgress()); // make output collector - String name = getOutputName(getPartition()); - final RecordWriter out = - job.getOutputFormat().getRecordWriter(FileSystem.get(job), job, name, reporter); + String finalName = getOutputName(getPartition()); + boolean runSpeculative = job.getSpeculativeExecution(); + FileSystem fs = FileSystem.get(job) ; + + if( runSpeculative ){ + fs = new PhasedFileSystem (fs , + getJobId(), getTipId(), getTaskId()); + } + + final RecordWriter out = + job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter) ; + OutputCollector collector = new OutputCollector() { public void collect(WritableComparable key, Writable value) throws IOException { @@ -299,6 +308,9 @@ } finally { reducer.close(); out.close(reporter); + if( runSpeculative ){ + ((PhasedFileSystem)fs).commit(); + } } done(umbilical); } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=477876&r1=477875&r2=477876 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Tue Nov 21 12:40:24 2006 @@ -38,6 +38,7 @@ private String jobFile; // job configuration file private String taskId; // unique, includes job id private String jobId; // unique jobid + private String tipId ; private int partition; // id within job private TaskStatus.Phase phase ; // current phase of the task @@ -47,10 +48,12 @@ public Task() {} - public Task(String jobId, String jobFile, String taskId, int partition) { + public Task(String jobId, String jobFile, String tipId, + String taskId, int partition) { this.jobFile = jobFile; this.taskId = taskId; this.jobId = jobId; + this.tipId = tipId; this.partition = partition; } @@ -60,6 +63,7 @@ public void setJobFile(String jobFile) { this.jobFile = jobFile; } public String getJobFile() { return jobFile; } public String getTaskId() { return taskId; } + public String getTipId(){ return tipId ; } /** * Get the job name for this task. @@ -97,12 +101,14 @@ public void write(DataOutput out) throws IOException { UTF8.writeString(out, jobFile); + UTF8.writeString(out, tipId); UTF8.writeString(out, taskId); UTF8.writeString(out, jobId); out.writeInt(partition); } public void readFields(DataInput in) throws IOException { jobFile = UTF8.readString(in); + tipId = UTF8.readString(in); taskId = UTF8.readString(in); jobId = UTF8.readString(in); partition = in.readInt(); @@ -114,6 +120,7 @@ * Localize the given JobConf to be specific for this task. */ public void localizeConfiguration(JobConf conf) { + conf.set("mapred.tip.id", tipId); conf.set("mapred.task.id", taskId); conf.setBoolean("mapred.task.is.map",isMapTask()); conf.setInt("mapred.task.partition", partition); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?view=diff&rev=477876&r1=477875&r2=477876 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Tue Nov 21 12:40:24 2006 @@ -18,10 +18,8 @@ package org.apache.hadoop.mapred; import org.apache.commons.logging.*; -import org.apache.hadoop.util.*; import java.text.NumberFormat; -import java.io.*; import java.util.*; @@ -44,6 +42,7 @@ static final int MAX_TASK_FAILURES = 4; static final double SPECULATIVE_GAP = 0.2; static final long SPECULATIVE_LAG = 60 * 1000; + static final int MAX_CONCURRENT_TASKS = 2; private static NumberFormat idFormat = NumberFormat.getInstance(); static { idFormat.setMinimumIntegerDigits(6); @@ -73,7 +72,9 @@ private boolean failed = false; private boolean killed = false; private TreeSet usableTaskIds = new TreeSet(); - private TreeSet recentTasks = new TreeSet(); + // Map from task Id -> TaskTracker Id, contains tasks that are + // currently runnings + private TreeMap activeTasks = new TreeMap(); private JobConf conf; private boolean runSpeculative; private Map> taskDiagnosticData = new TreeMap(); @@ -176,7 +177,7 @@ * @return true if any tasks are running */ public boolean isRunning() { - return !recentTasks.isEmpty(); + return !activeTasks.isEmpty(); } /** @@ -326,8 +327,8 @@ status.setFinishTime(System.currentTimeMillis()); } } - this.recentTasks.remove(taskid); - if (this.completes > 0) { + this.activeTasks.remove(taskid); + if (this.completes > 0 && this.isMapTask()) { this.completes--; } @@ -347,7 +348,7 @@ LOG.info("Task '" + taskid + "' has completed."); TaskStatus status = (TaskStatus) taskStatuses.get(taskid); status.setRunState(TaskStatus.State.SUCCEEDED); - recentTasks.remove(taskid); + activeTasks.remove(taskid); // // Now that the TIP is complete, the other speculative @@ -445,11 +446,19 @@ // in more depth eventually... // if (isMapTask() && - recentTasks.size() <= MAX_TASK_EXECS && + activeTasks.size() <= MAX_TASK_EXECS && runSpeculative && (averageProgress - progress >= SPECULATIVE_GAP) && (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG)) { return true; + }else{ + //Note: validate criteria for speculative reduce execution + if( runSpeculative && (activeTasks.size() < MAX_CONCURRENT_TASKS ) && + (averageProgress - progress >= SPECULATIVE_GAP) && + completes <= 0 && + (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG)) { + return true ; + } } return false; } @@ -469,13 +478,13 @@ String jobId = job.getProfile().getJobId(); if (isMapTask()) { - t = new MapTask(jobId, jobFile, taskid, partition, split); + t = new MapTask(jobId, jobFile, this.id, taskid, partition, split); } else { - t = new ReduceTask(jobId, jobFile, taskid, partition, numMaps); + t = new ReduceTask(jobId, jobFile, this.id, taskid, partition, numMaps); } t.setConf(conf); - recentTasks.add(taskid); + activeTasks.put(taskid, taskTracker); // Ask JobTracker to note that the task exists jobtracker.createTaskEntry(taskid, taskTracker, this); @@ -491,6 +500,15 @@ return machinesWhereFailed.contains(tracker); } + /** + * Was this task ever scheduled to run on this machine? + * @param tracker The task tracker name + * @return Was task scheduled on the tracker? + */ + public boolean hasRunOnMachine(String tracker){ + return this.activeTasks.values().contains(tracker) || + hasFailedOnMachine(tracker) ; + } /** * Get the number of machines where this task has failed. * @return the size of the failed machine set Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=477876&r1=477875&r2=477876 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Nov 21 12:40:24 2006 @@ -1044,12 +1044,33 @@ failures += 1; } runner.kill(); + runstate = TaskStatus.State.KILLED; } else if (runstate == TaskStatus.State.UNASSIGNED) { if (wasFailure) { failures += 1; runstate = TaskStatus.State.FAILED; } else { runstate = TaskStatus.State.KILLED; + } + } + + // the temporary file names in speculative exn are generated in + // the launched JVM, and we dont talk to it when killing so cleanup + // should happen here. find the task id and delete the temp directory + // for the task. only for killed speculative reduce instances + + // Note: it would be better to couple this with delete localfiles + // which is in conf currently, it doenst belong there. + + if( !task.isMapTask() && + this.defaultJobConf.getSpeculativeExecution() ){ + try{ + String systemDir = task.getConf().get("mapred.system.dir"); + String taskTempDir = systemDir + "/" + + task.getJobId() + "/" + task.getTipId(); + fs.delete(new Path(taskTempDir)) ; + }catch(IOException e){ + LOG.warn("Error in deleting reduce temporary output",e); } } }