Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Jun 5 02:33:44 2012
@@ -65,6 +65,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.io.IntWritable;
@@ -72,50 +73,47 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.mapred.TaskController.DebugScriptContext;
-import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
-import org.apache.hadoop.mapred.TaskController.TaskControllerPathDeletionContext;
-import org.apache.hadoop.mapred.TaskController.TaskControllerTaskPathDeletionContext;
-import org.apache.hadoop.mapred.TaskController.TaskControllerJobPathDeletionContext;
+import org.apache.hadoop.mapred.TaskController.DeletionContext;
import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
import org.apache.hadoop.mapred.pipes.Submitter;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
-import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
-import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsException;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.mapreduce.util.ConfigUtil;
+import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
+import org.apache.hadoop.mapreduce.util.MemoryCalculatorPlugin;
+import org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree;
+import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
-import org.apache.hadoop.util.DiskChecker;
-import org.apache.hadoop.mapreduce.util.ConfigUtil;
-import org.apache.hadoop.mapreduce.util.MemoryCalculatorPlugin;
-import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
-import org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.RunJar;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
/*******************************************************
* TaskTracker is a process that starts and tracks MR Tasks
@@ -164,14 +162,15 @@ public class TaskTracker
public static final Log ClientTraceLog =
LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
- // Job ACLs file is created by TaskTracker under userlogs/$jobid directory for
- // each job at job localization time. This will be used by TaskLogServlet for
- // authorizing viewing of task logs of that job
+ //Job ACLs file is created by TaskController under userlogs/$jobid directory
+ //for each job at job localization time. This will be used by TaskLogServlet
+ //for authorizing viewing of task logs of that job
static String jobACLsFile = "job-acls.xml";
volatile boolean running = true;
private LocalDirAllocator localDirAllocator;
+ private String[] localdirs;
String taskTrackerName;
String localHostname;
InetSocketAddress jobTrackAddr;
@@ -241,11 +240,12 @@ public class TaskTracker
static final String DISTCACHEDIR = "distcache";
static final String JOBCACHE = "jobcache";
static final String OUTPUT = "output";
- private static final String JARSDIR = "jars";
+ static final String JARSDIR = "jars";
static final String LOCAL_SPLIT_FILE = "split.dta";
static final String LOCAL_SPLIT_META_FILE = "split.info";
static final String JOBFILE = "job.xml";
static final String JOB_TOKEN_FILE="jobToken"; //localized file
+ static final String TT_PRIVATE_DIR = "ttprivate";
static final String JOB_LOCAL_DIR = MRJobConfig.JOB_LOCAL_DIR;
@@ -445,7 +445,6 @@ public class TaskTracker
RunningJob rJob = null;
if (!runningJobs.containsKey(jobId)) {
rJob = new RunningJob(jobId);
- rJob.localized = false;
rJob.tasks = new HashSet<TaskInProgress>();
runningJobs.put(jobId, rJob);
} else {
@@ -454,7 +453,6 @@ public class TaskTracker
synchronized (rJob) {
rJob.tasks.add(tip);
}
- runningJobs.notify(); //notify the fetcher thread
return rJob;
}
}
@@ -512,22 +510,32 @@ public class TaskTracker
return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOBFILE;
}
+ static String getPrivateDirJobConfFile(String user, String jobid) {
+ return TT_PRIVATE_DIR + Path.SEPARATOR + getLocalJobConfFile(user, jobid);
+ }
+
static String getLocalJobTokenFile(String user, String jobid) {
- return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOB_TOKEN_FILE;
+ return getLocalJobDir(user, jobid) + Path.SEPARATOR +
+ TaskTracker.JOB_TOKEN_FILE;
}
-
static String getTaskConfFile(String user, String jobid, String taskid,
boolean isCleanupAttempt) {
return getLocalTaskDir(user, jobid, taskid, isCleanupAttempt)
+ Path.SEPARATOR + TaskTracker.JOBFILE;
}
+ static String getPrivateDirTaskScriptLocation(String user, String jobid,
+ String taskid) {
+ return TT_PRIVATE_DIR + Path.SEPARATOR +
+ getLocalTaskDir(user, jobid, taskid);
+ }
+
static String getJobJarsDir(String user, String jobid) {
return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JARSDIR;
}
- static String getJobJarFile(String user, String jobid) {
+ public static String getJobJarFile(String user, String jobid) {
return getJobJarsDir(user, jobid) + Path.SEPARATOR + "job.jar";
}
@@ -551,7 +559,8 @@ public class TaskTracker
+ TaskTracker.OUTPUT;
}
- static String getLocalTaskDir(String user, String jobid, String taskid) {
+ public static String getLocalTaskDir(String user, String jobid,
+ String taskid) {
return getLocalTaskDir(user, jobid, taskid, false);
}
@@ -569,6 +578,19 @@ public class TaskTracker
String dir = getLocalTaskDir(user, jobid, taskid, isCleanupAttempt);
return dir + Path.SEPARATOR + MRConstants.WORKDIR;
}
+
+ static String getPrivateDirJobTokenFile(String user, String jobid) {
+ return TT_PRIVATE_DIR + Path.SEPARATOR +
+ getLocalJobTokenFile(user, jobid);
+ }
+
+ static String getPrivateDirForJob(String user, String jobid) {
+ return TT_PRIVATE_DIR + Path.SEPARATOR + getLocalJobDir(user, jobid) ;
+ }
+
+ String[] getLocalDirs() {
+ return localdirs;
+ }
String getPid(TaskAttemptID tid) {
TaskInProgress tip = tasks.get(tid);
@@ -587,7 +609,47 @@ public class TaskTracker
protocol);
}
}
-
+
+ /**
+ * Delete all of the user directories.
+ * @param conf the TT configuration
+ * @throws IOException
+ */
+ private void deleteUserDirectories(Configuration conf) throws IOException {
+ for(String root: localdirs) {
+ for(FileStatus status: localFs.listStatus(new Path(root, SUBDIR))) {
+ String owner = status.getOwner();
+ String path = status.getPath().getName();
+ if (path.equals(owner)) {
+ taskController.deleteAsUser(owner, "");
+ }
+ }
+ }
+ }
+
+ public void cleanupAllVolumes() throws IOException {
+ for (int v = 0; v < localdirs.length; v++) {
+ // List all files inside the volumes
+ FileStatus[] files = localFs.listStatus(new Path(localdirs[v]));
+ for (int f = 0; f < files.length; f++) {
+ if (files[f].getPath().getName().equals(SUBDIR)) {
+ FileStatus[] userDirs =
+ localFs.listStatus(new Path(localdirs[v] +Path.SEPARATOR+ SUBDIR));
+ for (int k = 0; k < userDirs.length; k++) {
+ // Get the relative file name to the root of the volume
+ String absoluteFilename = files[f].getPath().toUri().getPath();
+ getAsyncDiskService().moveAndDeleteRelativePath(localdirs[v],
+ absoluteFilename);
+ }
+ // // Do not delete the current TOBEDELETED
+ // if (!TOBEDELETED.equals(relative)) {
+ // moveAndDeleteRelativePath(volumes[v], relative);
+ // }
+ }
+ }
+ getAsyncDiskService().moveAndDeleteFromEachVolume(TT_PRIVATE_DIR);
+ }
+ }
int getHttpPort() {
return httpPort;
@@ -617,9 +679,18 @@ public class TaskTracker
// Check local disk, start async disk service, and clean up all
// local directories.
- checkLocalDirs(this.fConf.getLocalDirs());
- setAsyncDiskService(new MRAsyncDiskService(fConf));
- getAsyncDiskService().cleanupAllVolumes();
+ checkLocalDirs((localdirs = this.fConf.getLocalDirs()));
+ setAsyncDiskService(new MRAsyncDiskService(localFs,
+ taskController, fConf.getLocalDirs()));
+ cleanupAllVolumes();
+ final FsPermission ttdir = FsPermission.createImmutable((short) 0755);
+ for (String s : localdirs) {
+ localFs.mkdirs(new Path(s, SUBDIR), ttdir);
+ }
+ final FsPermission priv = FsPermission.createImmutable((short) 0700);
+ for (String s : localdirs) {
+ localFs.mkdirs(new Path(s, TT_PRIVATE_DIR), priv);
+ }
// Clear out state tables
this.tasks.clear();
@@ -677,15 +748,6 @@ public class TaskTracker
this.taskTrackerName = "tracker_" + localHostname + ":" + taskReportAddress;
LOG.info("Starting tracker " + taskTrackerName);
- Class<? extends TaskController> taskControllerClass = fConf.getClass(
- TT_TASK_CONTROLLER, DefaultTaskController.class, TaskController.class);
- taskController = (TaskController) ReflectionUtils.newInstance(
- taskControllerClass, fConf);
-
-
- // setup and create jobcache directory with appropriate permissions
- taskController.setup();
-
// Initialize DistributedCache
this.distributedCacheManager =
new TrackerDistributedCacheManager(this.fConf, taskController,
@@ -730,7 +792,7 @@ public class TaskTracker
reduceLauncher.start();
// create a localizer instance
- setLocalizer(new Localizer(localFs, fConf.getLocalDirs(), taskController));
+ setLocalizer(new Localizer(localFs, fConf.getLocalDirs()));
//Start up node health checker service.
if (shouldStartHealthMonitor(this.fConf)) {
@@ -814,6 +876,9 @@ public class TaskTracker
List <FetchStatus> fList = new ArrayList<FetchStatus>();
for (Map.Entry <JobID, RunningJob> item : runningJobs.entrySet()) {
RunningJob rjob = item.getValue();
+ if (!rjob.localized) {
+ continue;
+ }
JobID jobId = item.getKey();
FetchStatus f;
synchronized (rjob) {
@@ -987,33 +1052,29 @@ public class TaskTracker
Task t = tip.getTask();
JobID jobId = t.getJobID();
RunningJob rjob = addTaskToJob(jobId, tip);
-
- // Initialize the user directories if needed.
- getLocalizer().initializeUserDirs(t.getUser());
+ InetSocketAddress ttAddr = getTaskTrackerReportAddress();
synchronized (rjob) {
if (!rjob.localized) {
-
- JobConf localJobConf = localizeJobFiles(t, rjob);
- // initialize job log directory
- initializeJobLogDir(jobId, localJobConf);
-
- // Now initialize the job via task-controller so as to set
- // ownership/permissions of jars, job-work-dir. Note that initializeJob
- // should be the last call after every other directory/file to be
- // directly under the job directory is created.
- JobInitializationContext context = new JobInitializationContext();
- context.jobid = jobId;
- context.user = t.getUser();
- context.workDir = new File(localJobConf.get(JOB_LOCAL_DIR));
- taskController.initializeJob(context);
-
+ Path localJobConfPath = initializeJob(t, rjob, ttAddr);
+ JobConf localJobConf = new JobConf(localJobConfPath);
+ // to be doubly sure, overwrite the user in the config with the one the
+ // TT thinks it is
+ localJobConf.setUser(t.getUser());
+ //also reset the #tasks per jvm
+ resetNumTasksPerJvm(localJobConf);
+ //set the base jobconf path in rjob; all tasks will use
+ //this as the base path when they run
+ rjob.localizedJobConf = localJobConfPath;
rjob.jobConf = localJobConf;
rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
localJobConf.getKeepFailedTaskFiles());
rjob.localized = true;
}
}
+ synchronized (runningJobs) {
+ runningJobs.notify(); //notify the fetcher thread
+ }
return rjob;
}
@@ -1032,31 +1093,32 @@ public class TaskTracker
* Localize the job on this tasktracker. Specifically
* <ul>
* <li>Cleanup and create job directories on all disks</li>
+ * <li>Download the credentials file</li>
* <li>Download the job config file job.xml from the FS</li>
- * <li>Create the job work directory and set {@link TaskTracker#JOB_LOCAL_DIR}
- * in the configuration.
- * <li>Download the job jar file job.jar from the FS, unjar it and set jar
- * file in the configuration.</li>
+ * <li>Invokes the {@link TaskController} to do the rest of the job
+ * initialization</li>
* </ul>
- *
+ *
* @param t task whose job has to be localized on this TT
- * @return the modified job configuration to be used for all the tasks of this
- * job as a starting point.
+ * @param rjob the {@link RunningJob}
+ * @param ttAddr the tasktracker's RPC address
+ * @return the path to the job configuration to be used for all the tasks
+ * of this job as a starting point.
* @throws IOException
*/
- JobConf localizeJobFiles(Task t, RunningJob rjob)
- throws IOException, InterruptedException {
- JobID jobId = t.getJobID();
- String userName = t.getUser();
-
- // Initialize the job directories
- FileSystem localFs = FileSystem.getLocal(fConf);
- getLocalizer().initializeJobDirs(userName, jobId);
+ Path initializeJob(final Task t, final RunningJob rjob,
+ final InetSocketAddress ttAddr)
+ throws IOException, InterruptedException {
+ final JobID jobId = t.getJobID();
+
+ final Path jobFile = new Path(t.getJobFile());
+ final String userName = t.getUser();
+ final Configuration conf = getJobConf();
// save local copy of JobToken file
- String localJobTokenFile = localizeJobTokenFile(t.getUser(), jobId);
+ final String localJobTokenFile = localizeJobTokenFile(t.getUser(), jobId);
rjob.ugi = UserGroupInformation.createRemoteUser(t.getUser());
- Credentials ts = TokenCache.loadTokens(localJobTokenFile, fConf);
+ Credentials ts = TokenCache.loadTokens(localJobTokenFile, conf);
Token<JobTokenIdentifier> jt = TokenCache.getJobToken(ts);
if (jt != null) { //could be null in the case of some unit tests
getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
@@ -1064,38 +1126,87 @@ public class TaskTracker
for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
rjob.ugi.addToken(token);
}
+ FileSystem userFs = getFS(jobFile, jobId, conf);
// Download the job.xml for this job from the system FS
- Path localJobFile =
- localizeJobConfFile(new Path(t.getJobFile()), userName, jobId);
+ final Path localJobFile =
+ localizeJobConfFile(new Path(t.getJobFile()), userName, userFs, jobId);
- JobConf localJobConf = new JobConf(localJobFile);
- //WE WILL TRUST THE USERNAME THAT WE GOT FROM THE JOBTRACKER
- //AS PART OF THE TASK OBJECT
- localJobConf.setUser(userName);
-
- // set the location of the token file into jobConf to transfer
- // the name to TaskRunner
- localJobConf.set(TokenCache.JOB_TOKENS_FILENAME,
- localJobTokenFile);
-
-
- // create the 'job-work' directory: job-specific shared directory for use as
- // scratch space by all tasks of the same job running on this TaskTracker.
- Path workDir =
- lDirAlloc.getLocalPathForWrite(getJobWorkDir(userName, jobId
- .toString()), fConf);
- if (!localFs.mkdirs(workDir)) {
- throw new IOException("Mkdirs failed to create "
- + workDir.toString());
- }
- System.setProperty(JOB_LOCAL_DIR, workDir.toUri().getPath());
- localJobConf.set(JOB_LOCAL_DIR, workDir.toUri().getPath());
- // Download the job.jar for this job from the system FS
- localizeJobJarFile(userName, jobId, localFs, localJobConf);
-
- return localJobConf;
+ /**
+ * Now initialize the job via task-controller to do the rest of the
+ * job-init. Do this within a doAs since the public distributed cache
+ * is also set up here.
+ * To support potential authenticated HDFS accesses, we need the tokens
+ */
+ rjob.ugi.doAs(new PrivilegedExceptionAction<Object>() {
+ public Object run() throws IOException, InterruptedException {
+ try {
+ final JobConf localJobConf = new JobConf(localJobFile);
+ // Setup the public distributed cache
+ TaskDistributedCacheManager taskDistributedCacheManager =
+ getTrackerDistributedCacheManager()
+ .newTaskDistributedCacheManager(jobId, localJobConf);
+ rjob.distCacheMgr = taskDistributedCacheManager;
+ taskDistributedCacheManager.setupCache(localJobConf,
+ TaskTracker.getPublicDistributedCacheDir(),
+ TaskTracker.getPrivateDistributedCacheDir(userName));
+
+ // Set some config values
+ localJobConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY,
+ getJobConf().get(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
+ if (conf.get("slave.host.name") != null) {
+ localJobConf.set("slave.host.name", conf.get("slave.host.name"));
+ }
+ resetNumTasksPerJvm(localJobConf);
+ localJobConf.setUser(t.getUser());
+
+ // write back the config (this config will have the updates that the
+ // distributed cache manager makes as well)
+ JobLocalizer.writeLocalJobFile(localJobFile, localJobConf);
+ taskController.initializeJob(t.getUser(), jobId.toString(),
+ new Path(localJobTokenFile), localJobFile, TaskTracker.this,
+ ttAddr);
+ } catch (IOException e) {
+ LOG.warn("Exception while localization " +
+ StringUtils.stringifyException(e));
+ throw e;
+ } catch (InterruptedException ie) {
+ LOG.warn("Exception while localization " +
+ StringUtils.stringifyException(ie));
+ throw ie;
+ }
+ return null;
+ }
+ });
+ //search for the conf that the initializeJob created
+ //need to look up certain configs from this conf, like
+ //the distributed cache, profiling, etc. ones
+ Path initializedConf = lDirAlloc.getLocalPathToRead(getLocalJobConfFile(
+ userName, jobId.toString()), getJobConf());
+ return initializedConf;
}
+ /** If certain configs are enabled, then jvm-reuse should be disabled
+ * @param localJobConf
+ */
+ static void resetNumTasksPerJvm(JobConf localJobConf) {
+ boolean debugEnabled = false;
+ if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+ return;
+ }
+ if (localJobConf.getMapDebugScript() != null ||
+ localJobConf.getReduceDebugScript() != null) {
+ debugEnabled = true;
+ }
+ String keepPattern = localJobConf.getKeepTaskFilesPattern();
+
+ if (debugEnabled || localJobConf.getProfileEnabled() ||
+ keepPattern != null || localJobConf.getKeepFailedTaskFiles()) {
+ //disable jvm reuse
+ localJobConf.setNumTasksToExecutePerJvm(1);
+ }
+ }
+
+
// Create job userlog dir.
// Create job acls file in job log dir, if needed.
void initializeJobLogDir(JobID jobId, JobConf localJobConf)
@@ -1163,15 +1274,15 @@ public class TaskTracker
/**
* Download the job configuration file from the FS.
*
- * @param t Task whose job file has to be downloaded
- * @param jobId jobid of the task
+ * @param jobFile the original location of the configuration file
+ * @param user the user in question
+ * @param userFs the FileSystem created on behalf of the user
+ * @param jobId jobid in question
* @return the local file system path of the downloaded file.
* @throws IOException
*/
- private Path localizeJobConfFile(Path jobFile, String user, JobID jobId)
- throws IOException, InterruptedException {
- final JobConf conf = new JobConf(getJobConf());
- FileSystem userFs = getFS(jobFile, jobId, conf);
+ private Path localizeJobConfFile(Path jobFile, String user,
+ FileSystem userFs, JobID jobId) throws IOException {
// Get sizes of JobFile
// sizes are -1 if they are not present.
FileStatus status = null;
@@ -1183,69 +1294,25 @@ public class TaskTracker
jobFileSize = -1;
}
- Path localJobFile =
- lDirAlloc.getLocalPathForWrite(getLocalJobConfFile(user, jobId.toString()),
- jobFileSize, fConf);
+ Path localJobFile = lDirAlloc.getLocalPathForWrite(
+ getPrivateDirJobConfFile(user, jobId.toString()), jobFileSize, fConf);
// Download job.xml
userFs.copyToLocalFile(jobFile, localJobFile);
return localJobFile;
}
- /**
- * Download the job jar file from FS to the local file system and unjar it.
- * Set the local jar file in the passed configuration.
- *
- * @param jobId
- * @param localFs
- * @param localJobConf
- * @throws IOException
- */
- private void localizeJobJarFile(String user, JobID jobId, FileSystem localFs,
- JobConf localJobConf)
- throws IOException, InterruptedException {
- // copy Jar file to the local FS and unjar it.
- String jarFile = localJobConf.getJar();
- FileStatus status = null;
- long jarFileSize = -1;
- if (jarFile != null) {
- Path jarFilePath = new Path(jarFile);
- FileSystem fs = getFS(jarFilePath, jobId, localJobConf);
- try {
- status = fs.getFileStatus(jarFilePath);
- jarFileSize = status.getLen();
- } catch (FileNotFoundException fe) {
- jarFileSize = -1;
- }
- // Here we check for five times the size of jarFileSize to accommodate for
- // unjarring the jar file in the jars directory
- Path localJarFile =
- lDirAlloc.getLocalPathForWrite(
- getJobJarFile(user, jobId.toString()), 5 * jarFileSize, fConf);
-
- // Download job.jar
- fs.copyToLocalFile(jarFilePath, localJarFile);
-
- localJobConf.setJar(localJarFile.toString());
-
- // Un-jar the parts of the job.jar that need to be added to the classpath
- RunJar.unJar(
- new File(localJarFile.toString()),
- new File(localJarFile.getParent().toString()),
- localJobConf.getJarUnpackPattern());
- }
- }
-
private void launchTaskForJob(TaskInProgress tip, JobConf jobConf,
- UserGroupInformation ugi) throws IOException {
+ RunningJob rjob) throws IOException {
synchronized (tip) {
tip.setJobConf(jobConf);
- tip.setUGI(ugi);
- tip.launchTask();
+ tip.setUGI(rjob.ugi);
+ tip.launchTask(rjob);
}
}
- public synchronized void shutdown() throws IOException {
+ public synchronized void shutdown()
+ throws IOException, InterruptedException {
shuttingDown = true;
close();
if (this.server != null) {
@@ -1262,8 +1329,9 @@ public class TaskTracker
* any running tasks or threads, and cleanup disk space. A new TaskTracker
* within the same process space might be restarted, so everything must be
* clean.
+ * @throws InterruptedException
*/
- public synchronized void close() throws IOException {
+ public synchronized void close() throws IOException, InterruptedException {
//
// Kill running tasks. Do this in a 2nd vector, called 'tasksToClose',
// because calling jobHasFinished() may result in an edit to 'tasks'.
@@ -1371,8 +1439,14 @@ public class TaskTracker
server.start();
this.httpPort = server.getPort();
checkJettyPort(httpPort);
+ Class<? extends TaskController> taskControllerClass =
+ conf.getClass("mapred.task.tracker.task-controller",
+ DefaultTaskController.class, TaskController.class);
+ taskController =
+ (TaskController) ReflectionUtils.newInstance(taskControllerClass, conf);
+ taskController.setup(localDirAllocator);
// create task log cleanup thread
- setTaskLogCleanupThread(new UserLogCleaner(fConf));
+ setTaskLogCleanupThread(new UserLogCleaner(fConf, taskController));
UserGroupInformation.setConfiguration(fConf);
SecurityUtil.login(fConf, TTConfig.TT_KEYTAB_FILE, TTConfig.TT_USER_NAME);
@@ -1392,7 +1466,7 @@ public class TaskTracker
private void startCleanupThreads() throws IOException {
taskCleanupThread.setDaemon(true);
taskCleanupThread.start();
- directoryCleanupThread = new CleanupQueue();
+ directoryCleanupThread = CleanupQueue.getInstance();
// start tasklog cleanup thread
taskLogCleanupThread.setDaemon(true);
taskLogCleanupThread.start();
@@ -1873,7 +1947,6 @@ public class TaskTracker
ReflectionUtils.logThreadInfo(LOG, "lost task", 30);
tip.reportDiagnosticInfo(msg);
myInstrumentation.timedoutTask(tip.getTask().getTaskID());
- dumpTaskStack(tip);
purgeTask(tip, true);
}
}
@@ -1881,86 +1954,6 @@ public class TaskTracker
}
/**
- * Builds list of PathDeletionContext objects for the given paths
- */
- private static PathDeletionContext[] buildPathDeletionContexts(FileSystem fs,
- Path[] paths) {
- int i = 0;
- PathDeletionContext[] contexts = new PathDeletionContext[paths.length];
-
- for (Path p : paths) {
- contexts[i++] = new PathDeletionContext(fs, p.toUri().getPath());
- }
- return contexts;
- }
-
- /**
- * Builds list of {@link TaskControllerJobPathDeletionContext} objects for a
- * job each pointing to the job's jobLocalDir.
- * @param fs : FileSystem in which the dirs to be deleted
- * @param paths : mapred-local-dirs
- * @param id : {@link JobID} of the job for which the local-dir needs to
- * be cleaned up.
- * @param user : Job owner's username
- * @param taskController : the task-controller to be used for deletion of
- * jobLocalDir
- */
- static PathDeletionContext[] buildTaskControllerJobPathDeletionContexts(
- FileSystem fs, Path[] paths, JobID id, String user,
- TaskController taskController)
- throws IOException {
- int i = 0;
- PathDeletionContext[] contexts =
- new TaskControllerPathDeletionContext[paths.length];
-
- for (Path p : paths) {
- contexts[i++] = new TaskControllerJobPathDeletionContext(fs, p, id, user,
- taskController);
- }
- return contexts;
- }
-
- /**
- * Builds list of TaskControllerTaskPathDeletionContext objects for a task
- * @param fs : FileSystem in which the dirs to be deleted
- * @param paths : mapred-local-dirs
- * @param task : the task whose taskDir or taskWorkDir is going to be deleted
- * @param isWorkDir : the dir to be deleted is workDir or taskDir
- * @param taskController : the task-controller to be used for deletion of
- * taskDir or taskWorkDir
- */
- static PathDeletionContext[] buildTaskControllerTaskPathDeletionContexts(
- FileSystem fs, Path[] paths, Task task, boolean isWorkDir,
- TaskController taskController)
- throws IOException {
- int i = 0;
- PathDeletionContext[] contexts =
- new TaskControllerPathDeletionContext[paths.length];
-
- for (Path p : paths) {
- contexts[i++] = new TaskControllerTaskPathDeletionContext(fs, p, task,
- isWorkDir, taskController);
- }
- return contexts;
- }
-
- /**
- * Send a signal to a stuck task commanding it to dump stack traces
- * to stderr before we kill it with purgeTask().
- *
- * @param tip {@link TaskInProgress} to dump stack traces.
- */
- private void dumpTaskStack(TaskInProgress tip) {
- TaskRunner runner = tip.getTaskRunner();
- if (null == runner) {
- return; // tip is already abandoned.
- }
-
- JvmManager jvmMgr = runner.getJvmManager();
- jvmMgr.dumpStack(runner);
- }
-
- /**
* The task tracker is done with this job, so we need to clean up.
* @param action The action with the job
* @throws IOException
@@ -1976,7 +1969,12 @@ public class TaskTracker
if (rjob == null) {
LOG.warn("Unknown job " + jobId + " being deleted.");
} else {
- synchronized (rjob) {
+ synchronized (rjob) {
+ // decrement the reference counts for the items this job references
+ rjob.distCacheMgr.release();
+ // inform the cache manager that the job is done
+ getTrackerDistributedCacheManager()
+ .deleteTaskDistributedCacheManager(jobId);
// Add this tips of this job to queue of tasks to be purged
for (TaskInProgress tip : rjob.tasks) {
tip.jobHasFinished(false);
@@ -1988,7 +1986,7 @@ public class TaskTracker
// Delete the job directory for this
// task if the job is done/failed
if (!rjob.keepJobFiles) {
- removeJobFiles(rjob.jobConf.getUser(), rjob.getJobID());
+ removeJobFiles(rjob.ugi.getShortUserName(), rjob.getJobID());
}
// add job to taskLogCleanupThread
long now = System.currentTimeMillis();
@@ -2016,15 +2014,25 @@ public class TaskTracker
/**
* This job's files are no longer needed on this TT, remove them.
*
- * @param rjob
+ * @param user User who ran the job
+ * @param jobId Remove local work dirs for this job
* @throws IOException
*/
- void removeJobFiles(String user, JobID jobId)
- throws IOException {
- PathDeletionContext[] contexts =
- buildTaskControllerJobPathDeletionContexts(localFs,
- getLocalFiles(fConf, ""), jobId, user, taskController);
- directoryCleanupThread.addToQueue(contexts);
+ void removeJobFiles(String user, JobID jobId) throws IOException {
+ String jobDir = getLocalJobDir(user, jobId.toString());
+ PathDeletionContext jobCleanup =
+ new TaskController.DeletionContext(getTaskController(), false, user,
+ jobDir,
+ localdirs);
+ directoryCleanupThread.addToQueue(jobCleanup);
+
+ for (String str : localdirs) {
+ Path ttPrivateJobDir = FileSystem.getLocal(fConf).makeQualified(
+ new Path(str, TaskTracker.getPrivateDirForJob(user, jobId.toString())));
+ PathDeletionContext ttPrivateJobCleanup =
+ new CleanupQueue.PathDeletionContext(ttPrivateJobDir, fConf);
+ directoryCleanupThread.addToQueue(ttPrivateJobCleanup);
+ }
}
/**
@@ -2329,12 +2337,14 @@ public class TaskTracker
* Start a new task.
* All exceptions are handled locally, so that we don't mess up the
* task tracker.
+ * @throws InterruptedException
*/
- void startNewTask(TaskInProgress tip) {
+ void startNewTask(TaskInProgress tip) throws InterruptedException {
try {
RunningJob rjob = localizeJob(tip);
+ tip.getTask().setJobFile(rjob.localizedJobConf.toString());
// Localization is done. Neither rjob.jobConf nor rjob.ugi can be null
- launchTaskForJob(tip, new JobConf(rjob.jobConf), rjob.ugi);
+ launchTaskForJob(tip, new JobConf(rjob.jobConf), rjob);
} catch (Throwable e) {
String msg = ("Error initializing " + tip.getTask().getTaskID() +
":\n" + StringUtils.stringifyException(e));
@@ -2344,8 +2354,9 @@ public class TaskTracker
tip.kill(true);
tip.cleanup(true);
} catch (IOException ie2) {
- LOG.info("Error cleaning up " + tip.getTask().getTaskID() + ":\n" +
- StringUtils.stringifyException(ie2));
+ LOG.info("Error cleaning up " + tip.getTask().getTaskID(), ie2);
+ } catch (InterruptedException ie2) {
+ LOG.info("Error cleaning up " + tip.getTask().getTaskID(), ie2);
}
// Careful!
@@ -2466,7 +2477,6 @@ public class TaskTracker
private TaskRunner runner;
volatile boolean done = false;
volatile boolean wasKilled = false;
- private JobConf defaultJobConf;
private JobConf localJobConf;
private boolean keepFailedTaskFiles;
private boolean alwaysKeepTaskFiles;
@@ -2498,7 +2508,6 @@ public class TaskTracker
this.task = task;
this.launcher = launcher;
this.lastProgressReport = System.currentTimeMillis();
- this.defaultJobConf = conf;
localJobConf = null;
taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskID(),
0.0f,
@@ -2516,66 +2525,9 @@ public class TaskTracker
}
void localizeTask(Task task) throws IOException{
-
- FileSystem localFs = FileSystem.getLocal(fConf);
-
- // create taskDirs on all the disks.
- getLocalizer().initializeAttemptDirs(task.getUser(),
- task.getJobID().toString(), task.getTaskID().toString(),
- task.isTaskCleanupTask());
-
- // create the working-directory of the task
- Path cwd =
- lDirAlloc.getLocalPathForWrite(getTaskWorkDir(task.getUser(), task
- .getJobID().toString(), task.getTaskID().toString(), task
- .isTaskCleanupTask()), defaultJobConf);
- if (!localFs.mkdirs(cwd)) {
- throw new IOException("Mkdirs failed to create "
- + cwd.toString());
- }
-
- localJobConf.set(LOCAL_DIR,
- fConf.get(LOCAL_DIR));
-
- if (fConf.get(TT_HOST_NAME) != null) {
- localJobConf.set(TT_HOST_NAME, fConf.get(TT_HOST_NAME));
- }
-
- keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
-
// Do the task-type specific localization
+ //TODO: are these calls really required
task.localizeConfiguration(localJobConf);
-
- List<String[]> staticResolutions = NetUtils.getAllStaticResolutions();
- if (staticResolutions != null && staticResolutions.size() > 0) {
- StringBuffer str = new StringBuffer();
-
- for (int i = 0; i < staticResolutions.size(); i++) {
- String[] hostToResolved = staticResolutions.get(i);
- str.append(hostToResolved[0]+"="+hostToResolved[1]);
- if (i != staticResolutions.size() - 1) {
- str.append(',');
- }
- }
- localJobConf.set(TT_STATIC_RESOLUTIONS, str.toString());
- }
- if (task.isMapTask()) {
- debugCommand = localJobConf.getMapDebugScript();
- } else {
- debugCommand = localJobConf.getReduceDebugScript();
- }
- String keepPattern = localJobConf.getKeepTaskFilesPattern();
- if (keepPattern != null) {
- alwaysKeepTaskFiles =
- Pattern.matches(keepPattern, task.getTaskID().toString());
- } else {
- alwaysKeepTaskFiles = false;
- }
- if (debugCommand != null || localJobConf.getProfileEnabled() ||
- alwaysKeepTaskFiles || keepFailedTaskFiles) {
- //disable jvm reuse
- localJobConf.setNumTasksToExecutePerJvm(1);
- }
task.setConf(localJobConf);
}
@@ -2598,6 +2550,18 @@ public class TaskTracker
keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
taskTimeout = localJobConf.getLong(MRJobConfig.TASK_TIMEOUT,
10 * 60 * 1000);
+ if (task.isMapTask()) {
+ debugCommand = localJobConf.getMapDebugScript();
+ } else {
+ debugCommand = localJobConf.getReduceDebugScript();
+ }
+ String keepPattern = localJobConf.getKeepTaskFilesPattern();
+ if (keepPattern != null) {
+ alwaysKeepTaskFiles =
+ Pattern.matches(keepPattern, task.getTaskID().toString());
+ } else {
+ alwaysKeepTaskFiles = false;
+ }
}
public synchronized JobConf getJobConf() {
@@ -2618,7 +2582,7 @@ public class TaskTracker
/**
* Kick off the task execution
*/
- public synchronized void launchTask() throws IOException {
+ public synchronized void launchTask(RunningJob rjob) throws IOException {
if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||
this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
@@ -2626,7 +2590,7 @@ public class TaskTracker
if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
this.taskStatus.setRunState(TaskStatus.State.RUNNING);
}
- setTaskRunner(task.createRunner(TaskTracker.this, this));
+ setTaskRunner(task.createRunner(TaskTracker.this, this, rjob));
this.runner.start();
this.taskStatus.setStartTime(System.currentTimeMillis());
} else {
@@ -2886,11 +2850,6 @@ public class TaskTracker
} catch(Exception e){
LOG.warn("Exception finding task's stdout/err/syslog files", e);
}
- File workDir = new File(lDirAlloc.getLocalPathToRead(
- TaskTracker.getLocalTaskDir(task.getUser(), task.getJobID()
- .toString(), task.getTaskID().toString(), task
- .isTaskCleanupTask())
- + Path.SEPARATOR + MRConstants.WORKDIR, localJobConf).toString());
// Build the command
File stdout = TaskLog.getTaskLogFile(task.getTaskID(), task
.isTaskCleanupTask(), TaskLog.LogName.DEBUGOUT);
@@ -2914,13 +2873,14 @@ public class TaskTracker
vargs.add(taskSyslog);
vargs.add(jobConf);
vargs.add(program);
- DebugScriptContext context =
- new TaskController.DebugScriptContext();
- context.args = vargs;
- context.stdout = stdout;
- context.workDir = workDir;
- context.task = task;
- getTaskController().runDebugScript(context);
+ // TODO need to fix debug script
+ //DebugScriptContext context =
+ // new TaskController.DebugScriptContext();
+ //context.args = vargs;
+ //context.stdout = stdout;
+ //context.workDir = workDir;
+ //context.task = task;
+ //getTaskController().runDebugScript(context);
// add the lines of debug out to diagnostics
int num = localJobConf.getInt(MRJobConfig.TASK_DEBUGOUT_LINES, -1);
addDiagnostics(FileUtil.makeShellPath(stdout), num, "DEBUG OUT");
@@ -3001,7 +2961,12 @@ public class TaskTracker
getRunState() == TaskStatus.State.UNASSIGNED ||
getRunState() == TaskStatus.State.COMMIT_PENDING ||
isCleaningup()) {
- kill(wasFailure);
+ try {
+ kill(wasFailure);
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while killing " +
+ getTask().getTaskID(), e);
+ }
}
}
@@ -3013,8 +2978,10 @@ public class TaskTracker
* Something went wrong and the task must be killed.
*
* @param wasFailure was it a failure (versus a kill request)?
+ * @throws InterruptedException
*/
- public synchronized void kill(boolean wasFailure) throws IOException {
+ public synchronized void kill(boolean wasFailure
+ ) throws IOException, InterruptedException {
if (taskStatus.getRunState() == TaskStatus.State.RUNNING ||
taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING ||
isCleaningup()) {
@@ -3118,7 +3085,7 @@ public class TaskTracker
return;
}
try {
- removeTaskFiles(needCleanup, taskId);
+ removeTaskFiles(needCleanup);
} catch (Throwable ie) {
LOG.info("Error cleaning up task runner: "
+ StringUtils.stringifyException(ie));
@@ -3130,47 +3097,26 @@ public class TaskTracker
* Some or all of the files from this task are no longer required. Remove
* them via CleanupQueue.
*
- * @param needCleanup
+ * @param removeOutputs remove outputs as well as output
* @param taskId
* @throws IOException
*/
- void removeTaskFiles(boolean needCleanup, TaskAttemptID taskId)
- throws IOException {
- if (needCleanup) {
- if (runner != null) {
- // cleans up the output directory of the task (where map outputs
- // and reduce inputs get stored)
- runner.close();
- }
-
- if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
- // No jvm reuse, remove everything
- PathDeletionContext[] contexts =
- buildTaskControllerTaskPathDeletionContexts(localFs,
- getLocalFiles(fConf, ""), task, false/* not workDir */,
- taskController);
- directoryCleanupThread.addToQueue(contexts);
+ void removeTaskFiles(boolean removeOutputs) throws IOException {
+ if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+ String user = ugi.getShortUserName();
+ String jobId = task.getJobID().toString();
+ String taskId = task.getTaskID().toString();
+ boolean cleanup = task.isTaskCleanupTask();
+ String taskDir;
+ if (!removeOutputs) {
+ taskDir = TaskTracker.getTaskWorkDir(user, jobId, taskId, cleanup);
} else {
- // Jvm reuse. We don't delete the workdir since some other task
- // (running in the same JVM) might be using the dir. The JVM
- // running the tasks would clean the workdir per a task in the
- // task process itself.
- String localTaskDir =
- getLocalTaskDir(task.getUser(), task.getJobID().toString(), taskId
- .toString(), task.isTaskCleanupTask());
- PathDeletionContext[] contexts = buildPathDeletionContexts(
- localFs, getLocalFiles(defaultJobConf, localTaskDir +
- Path.SEPARATOR + TaskTracker.JOBFILE));
- directoryCleanupThread.addToQueue(contexts);
- }
- } else {
- if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
- PathDeletionContext[] contexts =
- buildTaskControllerTaskPathDeletionContexts(localFs,
- getLocalFiles(fConf, ""), task, true /* workDir */,
- taskController);
- directoryCleanupThread.addToQueue(contexts);
+ taskDir = TaskTracker.getLocalTaskDir(user, jobId, taskId, cleanup);
}
+ PathDeletionContext item =
+ new TaskController.DeletionContext(taskController, false, user,
+ taskDir, localdirs);
+ directoryCleanupThread.addToQueue(item);
}
}
@@ -3227,7 +3173,11 @@ public class TaskTracker
if (rjob == null) { //kill the JVM since the job is dead
LOG.info("Killing JVM " + jvmId + " since job " + jvmId.getJobId() +
" is dead");
- jvmManager.killJvm(jvmId);
+ try {
+ jvmManager.killJvm(jvmId);
+ } catch (InterruptedException e) {
+ LOG.warn("Failed to kill " + jvmId, e);
+ }
return new JvmTask(null, true);
}
TaskInProgress tip = jvmManager.getTaskForJvm(jvmId);
@@ -3448,12 +3398,15 @@ public class TaskTracker
static class RunningJob{
private JobID jobid;
private JobConf jobConf;
+ private Path localizedJobConf;
// keep this for later use
volatile Set<TaskInProgress> tasks;
- boolean localized;
+ volatile boolean localized;
boolean keepJobFiles;
UserGroupInformation ugi;
FetchStatus f;
+ TaskDistributedCacheManager distCacheMgr;
+
RunningJob(JobID jobid) {
this.jobid = jobid;
localized = false;
@@ -4118,42 +4071,54 @@ public class TaskTracker
return distributedCacheManager;
}
- /**
- * Download the job-token file from the FS and save on local fs.
- * @param user
- * @param jobId
- * @param jobConf
- * @return the local file system path of the downloaded file.
- * @throws IOException
- */
- private String localizeJobTokenFile(String user, JobID jobId)
- throws IOException {
- // check if the tokenJob file is there..
- Path skPath = new Path(systemDirectory,
- jobId.toString()+"/"+TokenCache.JOB_TOKEN_HDFS_FILE);
-
- FileStatus status = null;
- long jobTokenSize = -1;
- status = systemFS.getFileStatus(skPath); //throws FileNotFoundException
- jobTokenSize = status.getLen();
-
- Path localJobTokenFile =
- lDirAlloc.getLocalPathForWrite(getLocalJobTokenFile(user,
- jobId.toString()), jobTokenSize, fConf);
- String localJobTokenFileStr = localJobTokenFile.toUri().getPath();
- LOG.debug("localizingJobTokenFile from sd="+skPath.toUri().getPath() +
- " to " + localJobTokenFileStr);
-
- // Download job_token
- systemFS.copyToLocalFile(skPath, localJobTokenFile);
- return localJobTokenFileStr;
- }
-
- JobACLsManager getJobACLsManager() {
- return aclsManager.getJobACLsManager();
- }
+ /**
+ * Download the job-token file from the FS and save on local fs.
+ * @param user
+ * @param jobId
+ * @param jobConf
+ * @return the local file system path of the downloaded file.
+ * @throws IOException
+ */
+ private String localizeJobTokenFile(String user, JobID jobId)
+ throws IOException {
+ // check if the tokenJob file is there..
+ Path skPath = new Path(systemDirectory,
+ jobId.toString()+"/"+TokenCache.JOB_TOKEN_HDFS_FILE);
- ACLsManager getACLsManager() {
- return aclsManager;
- }
+ FileStatus status = null;
+ long jobTokenSize = -1;
+ status = systemFS.getFileStatus(skPath); //throws FileNotFoundException
+ jobTokenSize = status.getLen();
+
+ Path localJobTokenFile =
+ lDirAlloc.getLocalPathForWrite(getPrivateDirJobTokenFile(user,
+ jobId.toString()), jobTokenSize, fConf);
+ String localJobTokenFileStr = localJobTokenFile.toUri().getPath();
+ LOG.debug("localizingJobTokenFile from sd="+skPath.toUri().getPath() +
+ " to " + localJobTokenFileStr);
+
+ // Download job_token
+ systemFS.copyToLocalFile(skPath, localJobTokenFile);
+ return localJobTokenFileStr;
+ }
+
+ JobACLsManager getJobACLsManager() {
+ return aclsManager.getJobACLsManager();
+ }
+
+ ACLsManager getACLsManager() {
+ return aclsManager;
+ }
+
+ synchronized TaskInProgress getRunningTask(TaskAttemptID tid) {
+ return runningTasks.get(tid);
+ }
+
+ @Override
+ public void updatePrivateDistributedCacheSizes(
+ org.apache.hadoop.mapreduce.JobID jobId, long[] sizes)
+ throws IOException {
+ distributedCacheManager.setArchiveSizes(jobId, sizes);
+ }
+
}
Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Tue Jun 5 02:33:44 2012
@@ -161,4 +161,13 @@ public interface TaskUmbilicalProtocol e
TaskAttemptID id)
throws IOException;
+ /**
+ * The job initializer needs to report the sizes of the archive
+ * objects in the private distributed cache.
+ * @param jobId the job to update
+ * @param sizes the array of sizes that were computed
+ * @throws IOException
+ */
+ void updatePrivateDistributedCacheSizes(org.apache.hadoop.mapreduce.JobID jobId,
+ long[] sizes) throws IOException;
}
Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/UserLogCleaner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/UserLogCleaner.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/UserLogCleaner.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/UserLogCleaner.java Tue Jun 5 02:33:44 2012
@@ -32,6 +32,9 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
+import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
@@ -53,15 +56,19 @@ class UserLogCleaner extends Thread {
private Map<JobID, Long> completedJobs = Collections
.synchronizedMap(new HashMap<JobID, Long>());
private final long threadSleepTime;
- private MRAsyncDiskService logAsyncDisk;
private Clock clock;
+ private TaskController taskController;
+ private CleanupQueue cleanupQueue;
+ private FileSystem localFs;
- UserLogCleaner(Configuration conf) throws IOException {
+ UserLogCleaner(Configuration conf, TaskController taskController)
+ throws IOException {
threadSleepTime = conf.getLong(TTConfig.TT_USERLOGCLEANUP_SLEEPTIME,
DEFAULT_THREAD_SLEEP_TIME);
- logAsyncDisk = new MRAsyncDiskService(FileSystem.getLocal(conf), TaskLog
- .getUserLogDir().toString());
setClock(new Clock());
+ localFs = FileSystem.getLocal(conf);
+ this.taskController = taskController;
+ cleanupQueue = CleanupQueue.getInstance();
}
void setClock(Clock clock) {
@@ -100,7 +107,7 @@ class UserLogCleaner extends Thread {
// see if the job is old enough
if (entry.getValue().longValue() <= now) {
// add the job logs directory to for delete
- deleteLogPath(TaskLog.getJobDir(entry.getKey()).getAbsolutePath());
+ deleteLogPath(entry.getKey().toString());
completedJobIter.remove();
}
}
@@ -125,16 +132,12 @@ class UserLogCleaner extends Thread {
// add all the log dirs to taskLogsMnonitor.
long now = clock.getTime();
for (String logDir : logDirs) {
- if (logDir.equals(logAsyncDisk.TOBEDELETED)) {
- // skip this
- continue;
- }
JobID jobid = null;
try {
jobid = JobID.forName(logDir);
} catch (IllegalArgumentException ie) {
// if the directory is not a jobid, delete it immediately
- deleteLogPath(new File(userLogDir, logDir).getAbsolutePath());
+ deleteLogPath(logDir);
continue;
}
// add the job log directory with default retain hours, if it is not
@@ -197,6 +200,11 @@ class UserLogCleaner extends Thread {
*/
private void deleteLogPath(String logPath) throws IOException {
LOG.info("Deleting user log path " + logPath);
- logAsyncDisk.moveAndDeleteAbsolutePath(logPath);
+ String logRoot = TaskLog.getUserLogDir().toString();
+ String user = localFs.getFileStatus(new Path(logRoot, logPath)).getOwner();
+ PathDeletionContext item =
+ new TaskController.DeletionContext(taskController, true, user, logPath,
+ null);
+ cleanupQueue.addToQueue(item);
}
}
Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/JobContext.java Tue Jun 5 02:33:44 2012
@@ -265,7 +265,7 @@ public interface JobContext extends MRJo
* @return a string array of timestamps
* @throws IOException
*/
- public String[] getArchiveTimestamps();
+ public long[] getArchiveTimestamps();
/**
* Get the timestamps of the files. Used by internal
@@ -273,7 +273,7 @@ public interface JobContext extends MRJo
* @return a string array of timestamps
* @throws IOException
*/
- public String[] getFileTimestamps();
+ public long[] getFileTimestamps();
/**
* Get the configured number of maximum attempts that will be made to run a
Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java Tue Jun 5 02:33:44 2012
@@ -118,4 +118,4 @@ public class JobSubmissionFiles {
return stagingArea;
}
-}
\ No newline at end of file
+}
Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java Tue Jun 5 02:33:44 2012
@@ -26,9 +26,6 @@ import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapred.DefaultTaskController;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig;
import java.net.URI;
@@ -134,175 +131,6 @@ import java.net.URI;
@Deprecated
@InterfaceAudience.Private
public class DistributedCache {
- /**
- * Get the locally cached file or archive; it could either be
- * previously cached (and valid) or copy it from the {@link FileSystem} now.
- *
- * @param cache the cache to be localized, this should be specified as
- * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
- * @param conf The Confguration file which contains the filesystem
- * @param baseDir The base cache Dir where you wnat to localize the files/archives
- * @param fileStatus The file status on the dfs.
- * @param isArchive if the cache is an archive or a file. In case it is an
- * archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
- * be unzipped/unjarred/untarred automatically
- * and the directory where the archive is unzipped/unjarred/untarred is
- * returned as the Path.
- * In case of a file, the path to the file is returned
- * @param confFileStamp this is the hdfs file modification timestamp to verify that the
- * file to be cached hasn't changed since the job started
- * @param currentWorkDir this is the directory where you would want to create symlinks
- * for the locally cached files/archives
- * @return the path to directory where the archives are unjarred in case of archives,
- * the path to the file where the file is copied locally
- * @throws IOException
- * @deprecated Internal to MapReduce framework.
- * Use TrackerDistributedCacheManager instead.
- */
- @Deprecated
- public static Path getLocalCache(URI cache, Configuration conf,
- Path baseDir, FileStatus fileStatus,
- boolean isArchive, long confFileStamp,
- Path currentWorkDir)
- throws IOException {
- return getLocalCache(cache, conf, baseDir, fileStatus, isArchive,
- confFileStamp, currentWorkDir, true);
- }
-
- /**
- * Get the locally cached file or archive; it could either be
- * previously cached (and valid) or copy it from the {@link FileSystem} now.
- *
- * @param cache the cache to be localized, this should be specified as
- * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
- * @param conf The Confguration file which contains the filesystem
- * @param baseDir The base cache Dir where you wnat to localize the files/archives
- * @param fileStatus The file status on the dfs.
- * @param isArchive if the cache is an archive or a file. In case it is an
- * archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
- * be unzipped/unjarred/untarred automatically
- * and the directory where the archive is unzipped/unjarred/untarred is
- * returned as the Path.
- * In case of a file, the path to the file is returned
- * @param confFileStamp this is the hdfs file modification timestamp to verify that the
- * file to be cached hasn't changed since the job started
- * @param currentWorkDir this is the directory where you would want to create symlinks
- * for the locally cached files/archives
- * @param honorSymLinkConf if this is false, then the symlinks are not
- * created even if conf says so (this is required for an optimization in task
- * launches
- * @return the path to directory where the archives are unjarred in case of archives,
- * the path to the file where the file is copied locally
- * @throws IOException
- * @deprecated Internal to MapReduce framework.
- * Use TrackerDistributedCacheManager instead.
- */
- @Deprecated
- public static Path getLocalCache(URI cache, Configuration conf,
- Path baseDir, FileStatus fileStatus,
- boolean isArchive, long confFileStamp,
- Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
-
- return new TrackerDistributedCacheManager(conf, new DefaultTaskController())
- .getLocalCache(cache, conf, baseDir.toString(), fileStatus, isArchive,
- confFileStamp, currentWorkDir, honorSymLinkConf, false);
- }
-
- /**
- * Get the locally cached file or archive; it could either be
- * previously cached (and valid) or copy it from the {@link FileSystem} now.
- *
- * @param cache the cache to be localized, this should be specified as
- * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
- * @param conf The Confguration file which contains the filesystem
- * @param baseDir The base cache Dir where you wnat to localize the files/archives
- * @param isArchive if the cache is an archive or a file. In case it is an
- * archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
- * be unzipped/unjarred/untarred automatically
- * and the directory where the archive is unzipped/unjarred/untarred
- * is returned as the Path.
- * In case of a file, the path to the file is returned
- * @param confFileStamp this is the hdfs file modification timestamp to verify that the
- * file to be cached hasn't changed since the job started
- * @param currentWorkDir this is the directory where you would want to create symlinks
- * for the locally cached files/archives
- * @return the path to directory where the archives are unjarred in case of archives,
- * the path to the file where the file is copied locally
- * @throws IOException
- * @deprecated Internal to MapReduce framework.
- * Use TrackerDistributedCacheManager instead.
- */
- @Deprecated
- public static Path getLocalCache(URI cache, Configuration conf,
- Path baseDir, boolean isArchive,
- long confFileStamp, Path currentWorkDir)
- throws IOException {
- return getLocalCache(cache, conf,
- baseDir, null, isArchive,
- confFileStamp, currentWorkDir);
- }
-
- /**
- * This is the opposite of getlocalcache. When you are done with
- * using the cache, you need to release the cache
- * @param cache The cache URI to be released
- * @param conf configuration which contains the filesystem the cache
- * is contained in.
- * @throws IOException
- * @deprecated Internal to MapReduce framework.
- * Use TrackerDistributedCacheManager instead.
- */
- @Deprecated
- public static void releaseCache(URI cache, Configuration conf)
- throws IOException {
- // find the timestamp of the uri
- URI[] archives = DistributedCache.getCacheArchives(conf);
- URI[] files = DistributedCache.getCacheFiles(conf);
- String[] archivesTimestamps =
- DistributedCache.getArchiveTimestamps(conf);
- String[] filesTimestamps =
- DistributedCache.getFileTimestamps(conf);
- String timestamp = null;
- if (archives != null) {
- for (int i = 0; i < archives.length; i++) {
- if (archives[i].equals(cache)) {
- timestamp = archivesTimestamps[i];
- break;
- }
- }
- }
- if (timestamp == null && files != null) {
- for (int i = 0; i < files.length; i++) {
- if (files[i].equals(cache)) {
- timestamp = filesTimestamps[i];
- break;
- }
- }
- }
- if (timestamp == null) {
- throw new IOException("TimeStamp of the uri couldnot be found");
- }
- new TrackerDistributedCacheManager(conf, new DefaultTaskController())
- .releaseCache(cache, conf, Long.parseLong(timestamp),
- TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
- }
-
- /**
- * Returns the relative path of the dir this cache will be localized in
- * relative path that this cache will be localized in. For
- * hdfs://hostname:port/absolute_path -- the relative path is
- * hostname/absolute path -- if it is just /absolute_path -- then the
- * relative path is hostname of DFS this mapred cluster is running
- * on/absolute_path
- * @deprecated Internal to MapReduce framework. Use DistributedCacheManager
- * instead.
- */
- @Deprecated
- public static String makeRelative(URI cache, Configuration conf)
- throws IOException {
- return new TrackerDistributedCacheManager(conf, new DefaultTaskController())
- .makeRelative(cache, conf);
- }
/**
* Returns mtime of a given cache file on hdfs.
@@ -361,11 +189,22 @@ public class DistributedCache {
conf.set(MRJobConfig.CACHE_FILES, sfiles);
}
+ private static Path[] parsePaths(String[] strs) {
+ if (strs == null) {
+ return null;
+ }
+ Path[] result = new Path[strs.length];
+ for(int i=0; i < strs.length; ++i) {
+ result[i] = new Path(strs[i]);
+ }
+ return result;
+ }
+
/**
* Get cache archives set in the Configuration. Used by
* internal DistributedCache and MapReduce code.
* @param conf The configuration which contains the archives
- * @return A URI array of the caches set in the Configuration
+ * @return An array of the caches set in the Configuration
* @throws IOException
* @deprecated Use {@link JobContext#getCacheArchives()} instead
*/
@@ -378,7 +217,7 @@ public class DistributedCache {
* Get cache files set in the Configuration. Used by internal
* DistributedCache and MapReduce code.
* @param conf The configuration which contains the files
- * @return A URI array of the files set in the Configuration
+ * @return Am array of the files set in the Configuration
* @throws IOException
* @deprecated Use {@link JobContext#getCacheFiles()} instead
*/
@@ -417,30 +256,46 @@ public class DistributedCache {
}
/**
+ * Parse a list of strings into longs.
+ * @param strs the list of strings to parse
+ * @return a list of longs that were parsed. same length as strs.
+ */
+ private static long[] parseTimestamps(String[] strs) {
+ if (strs == null) {
+ return null;
+ }
+ long[] result = new long[strs.length];
+ for(int i=0; i < strs.length; ++i) {
+ result[i] = Long.parseLong(strs[i]);
+ }
+ return result;
+ }
+
+ /**
* Get the timestamps of the archives. Used by internal
* DistributedCache and MapReduce code.
* @param conf The configuration which stored the timestamps
- * @return a string array of timestamps
+ * @return a long array of timestamps
* @throws IOException
* @deprecated Use {@link JobContext#getArchiveTimestamps()} instead
*/
@Deprecated
- public static String[] getArchiveTimestamps(Configuration conf) {
- return conf.getStrings(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS);
+ public static long[] getArchiveTimestamps(Configuration conf) {
+ return
+ parseTimestamps(conf.getStrings(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS));
}
-
/**
* Get the timestamps of the files. Used by internal
* DistributedCache and MapReduce code.
* @param conf The configuration which stored the timestamps
- * @return a string array of timestamps
+ * @return a long array of timestamps
* @throws IOException
* @deprecated Use {@link JobContext#getFileTimestamps()} instead
*/
@Deprecated
- public static String[] getFileTimestamps(Configuration conf) {
- return conf.getStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS);
+ public static long[] getFileTimestamps(Configuration conf) {
+ return parseTimestamps(conf.getStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS));
}
/**
@@ -511,8 +366,8 @@ public class DistributedCache {
@Deprecated
public static void addCacheArchive(URI uri, Configuration conf) {
String archives = conf.get(MRJobConfig.CACHE_ARCHIVES);
- conf.set(MRJobConfig.CACHE_ARCHIVES, archives == null ? uri.toString()
- : archives + "," + uri.toString());
+ conf.set(MRJobConfig.CACHE_ARCHIVES,
+ archives == null ? uri.toString() : archives + "," + uri.toString());
}
/**
@@ -525,8 +380,32 @@ public class DistributedCache {
@Deprecated
public static void addCacheFile(URI uri, Configuration conf) {
String files = conf.get(MRJobConfig.CACHE_FILES);
- conf.set(MRJobConfig.CACHE_FILES, files == null ? uri.toString() : files + ","
- + uri.toString());
+ conf.set(MRJobConfig.CACHE_FILES,
+ files == null ? uri.toString() : files + "," + uri.toString());
+ }
+
+ /**
+ * Add a archive that has been localized to the conf. Used
+ * by internal DistributedCache code.
+ * @param conf The conf to modify to contain the localized caches
+ * @param str a comma separated list of local archives
+ */
+ public static void addLocalArchives(Configuration conf, String str) {
+ String archives = conf.get(MRJobConfig.CACHE_LOCALARCHIVES);
+ conf.set(MRJobConfig.CACHE_LOCALARCHIVES,
+ archives == null ? str : archives + "," + str);
+ }
+
+ /**
+ * Add a file that has been localized to the conf.. Used
+ * by internal DistributedCache code.
+ * @param conf The conf to modify to contain the localized caches
+ * @param str a comma separated list of local files
+ */
+ public static void addLocalFiles(Configuration conf, String str) {
+ String files = conf.get(MRJobConfig.CACHE_LOCALFILES);
+ conf.set(MRJobConfig.CACHE_LOCALFILES,
+ files == null ? str : files + "," + str);
}
/**
@@ -541,8 +420,8 @@ public class DistributedCache {
public static void addFileToClassPath(Path file, Configuration conf)
throws IOException {
String classpath = conf.get(MRJobConfig.CLASSPATH_FILES);
- conf.set(MRJobConfig.CLASSPATH_FILES, classpath == null ? file.toString()
- : classpath + "," + file.toString());
+ conf.set(MRJobConfig.CLASSPATH_FILES,
+ classpath == null ? file.toString() : classpath + "," + file);
FileSystem fs = FileSystem.get(conf);
URI uri = fs.makeQualified(file).toUri();
@@ -654,17 +533,4 @@ public class DistributedCache {
public static boolean checkURIs(URI[] uriFiles, URI[] uriArchives){
return TrackerDistributedCacheManager.checkURIs(uriFiles, uriArchives);
}
-
- /**
- * Clear the entire contents of the cache and delete the backing files. This
- * should only be used when the server is reinitializing, because the users
- * are going to lose their files.
- * @deprecated Internal to MapReduce framework.
- * Use TrackerDistributedCacheManager instead.
- */
- @Deprecated
- public static void purgeCache(Configuration conf) throws IOException {
- new TrackerDistributedCacheManager(conf, new DefaultTaskController())
- .purgeCache();
- }
}
Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java Tue Jun 5 02:33:44 2012
@@ -30,8 +30,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager.CacheStatus;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
@@ -48,10 +51,9 @@ import org.apache.hadoop.classification.
@InterfaceAudience.Private
public class TaskDistributedCacheManager {
private final TrackerDistributedCacheManager distributedCacheManager;
- private final Configuration taskConf;
private final List<CacheFile> cacheFiles = new ArrayList<CacheFile>();
private final List<String> classPaths = new ArrayList<String>();
-
+
private boolean setupCalled = false;
/**
@@ -75,9 +77,9 @@ public class TaskDistributedCacheManager
boolean localized = false;
/** The owner of the localized file. Relevant only on the tasktrackers */
final String owner;
-
- private CacheFile(URI uri, FileType type, boolean isPublic, long timestamp,
- boolean classPath) throws IOException {
+ private CacheStatus status;
+ CacheFile(URI uri, FileType type, boolean isPublic, long timestamp,
+ boolean classPath) throws IOException {
this.uri = uri;
this.type = type;
this.isPublic = isPublic;
@@ -88,12 +90,28 @@ public class TaskDistributedCacheManager
}
/**
+ * Set the status for this cache file.
+ * @param status
+ */
+ public void setStatus(CacheStatus status) {
+ this.status = status;
+ }
+
+ /**
+ * Get the status for this cache file.
+ * @return the status object
+ */
+ public CacheStatus getStatus() {
+ return status;
+ }
+
+ /**
* Converts the scheme used by DistributedCache to serialize what files to
* cache in the configuration into CacheFile objects that represent those
* files.
*/
private static List<CacheFile> makeCacheFiles(URI[] uris,
- String[] timestamps, String cacheVisibilities[], Path[] paths,
+ long[] timestamps, boolean cacheVisibilities[], Path[] paths,
FileType type) throws IOException {
List<CacheFile> ret = new ArrayList<CacheFile>();
if (uris != null) {
@@ -109,9 +127,8 @@ public class TaskDistributedCacheManager
for (int i = 0; i < uris.length; ++i) {
URI u = uris[i];
boolean isClassPath = (null != classPaths.get(u.getPath()));
- long t = Long.parseLong(timestamps[i]);
- ret.add(new CacheFile(u, type, Boolean.valueOf(cacheVisibilities[i]),
- t, isClassPath));
+ ret.add(new CacheFile(u, type, cacheVisibilities[i],
+ timestamps[i], isClassPath));
}
}
return ret;
@@ -130,7 +147,6 @@ public class TaskDistributedCacheManager
TrackerDistributedCacheManager distributedCacheManager,
Configuration taskConf) throws IOException {
this.distributedCacheManager = distributedCacheManager;
- this.taskConf = taskConf;
this.cacheFiles.addAll(
CacheFile.makeCacheFiles(DistributedCache.getCacheFiles(taskConf),
@@ -147,36 +163,42 @@ public class TaskDistributedCacheManager
}
/**
- * Retrieve files into the local cache and updates the task configuration
- * (which has been passed in via the constructor).
+ * Retrieve public distributed cache files into the local cache and updates
+ * the task configuration (which has been passed in via the constructor).
+ * The private distributed cache is just looked at and the paths where the
+ * files/archives should go to is decided here. The actual localization is
+ * done by {@link JobLocalizer}.
*
* It is the caller's responsibility to re-write the task configuration XML
* file, if necessary.
*/
- public void setup(LocalDirAllocator lDirAlloc, File workDir,
- String privateCacheSubdir, String publicCacheSubDir) throws IOException {
+ public void setupCache(Configuration taskConf, String publicCacheSubdir,
+ String privateCacheSubdir) throws IOException {
setupCalled = true;
- if (cacheFiles.isEmpty()) {
- return;
- }
-
ArrayList<Path> localArchives = new ArrayList<Path>();
ArrayList<Path> localFiles = new ArrayList<Path>();
- Path workdirPath = new Path(workDir.getAbsolutePath());
for (CacheFile cacheFile : cacheFiles) {
URI uri = cacheFile.uri;
FileSystem fileSystem = FileSystem.get(uri, taskConf);
FileStatus fileStatus = fileSystem.getFileStatus(new Path(uri.getPath()));
- String cacheSubdir = publicCacheSubDir;
- if (!cacheFile.isPublic) {
- cacheSubdir = privateCacheSubdir;
- }
- Path p = distributedCacheManager.getLocalCache(uri, taskConf,
- cacheSubdir, fileStatus,
- cacheFile.type == CacheFile.FileType.ARCHIVE,
- cacheFile.timestamp, workdirPath, false, cacheFile.isPublic);
+ Path p;
+ try {
+ if (cacheFile.isPublic) {
+ p = distributedCacheManager.getLocalCache(uri, taskConf,
+ publicCacheSubdir, fileStatus,
+ cacheFile.type == CacheFile.FileType.ARCHIVE,
+ cacheFile.timestamp, cacheFile.isPublic, cacheFile);
+ } else {
+ p = distributedCacheManager.getLocalCache(uri, taskConf,
+ privateCacheSubdir, fileStatus,
+ cacheFile.type == CacheFile.FileType.ARCHIVE,
+ cacheFile.timestamp, cacheFile.isPublic, cacheFile);
+ }
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted localizing cache file", e);
+ }
cacheFile.setLocalized(true);
if (cacheFile.type == CacheFile.FileType.ARCHIVE) {
@@ -191,10 +213,14 @@ public class TaskDistributedCacheManager
// Update the configuration object with localized data.
if (!localArchives.isEmpty()) {
+ // TODO verify
+// DistributedCache.addLocalArchives(taskConf,
TrackerDistributedCacheManager.setLocalArchives(taskConf,
stringifyPathList(localArchives));
}
if (!localFiles.isEmpty()) {
+ // TODO verify
+// DistributedCache.addLocalFiles(taskConf, stringifyPathList(localFiles));
TrackerDistributedCacheManager.setLocalFiles(taskConf,
stringifyPathList(localFiles));
}
@@ -239,9 +265,18 @@ public class TaskDistributedCacheManager
*/
public void release() throws IOException {
for (CacheFile c : cacheFiles) {
- if (c.getLocalized()) {
- distributedCacheManager.releaseCache(c.uri, taskConf, c.timestamp,
- c.owner);
+ if (c.getLocalized() && c.status != null) {
+ distributedCacheManager.releaseCache(c.status);
+ }
+ }
+ }
+
+ public void setSizes(long[] sizes) throws IOException {
+ int i = 0;
+ for (CacheFile c: cacheFiles) {
+ if (!c.isPublic && c.type == CacheFile.FileType.ARCHIVE &&
+ c.status != null) {
+ distributedCacheManager.setSize(c.status, sizes[i++]);
}
}
}
|