Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 72460 invoked from network); 19 Dec 2007 08:54:42 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 19 Dec 2007 08:54:42 -0000 Received: (qmail 17022 invoked by uid 500); 19 Dec 2007 08:54:31 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 16990 invoked by uid 500); 19 Dec 2007 08:54:31 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 16981 invoked by uid 99); 19 Dec 2007 08:54:31 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Dec 2007 00:54:31 -0800 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Dec 2007 08:54:17 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id A45991A9838; Wed, 19 Dec 2007 00:54:20 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r605471 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/filecache/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/mapred/ Date: Wed, 19 Dec 2007 08:54:19 -0000 To: hadoop-commits@lucene.apache.org From: acmurthy@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20071219085420.A45991A9838@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: acmurthy Date: Wed Dec 19 00:54:17 2007 New Revision: 605471 URL: http://svn.apache.org/viewvc?rev=605471&view=rev Log: HADOOP-2227. Use the LocalDirAllocator uniformly for handling all of the temporary storage required for a given task. It also implies that mapred.local.dir.minspacestart is handled by checking if there is enough free-space on any one of the available disks. Contributed by Amareshwari Sri Ramadasu. Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalDirAllocator.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=605471&r1=605470&r2=605471&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Wed Dec 19 00:54:17 2007 @@ -287,6 +287,12 @@ transaction log, it stops writing new transactions to that one. (Raghu Angadi via dhruba) + HADOOP-2227. Use the LocalDirAllocator uniformly for handling all of the + temporary storage required for a given task. It also implies that + mapred.local.dir.minspacestart is handled by checking if there is enough + free-space on any one of the available disks. (Amareshwari Sri Ramadasu + via acmurthy) + IMPROVEMENTS HADOOP-2160. Remove project-level, non-user documentation from Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java?rev=605471&r1=605470&r2=605471&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java Wed Dec 19 00:54:17 2007 @@ -128,6 +128,7 @@ * being used in the Configuration * @param conf The Confguration file which contains the filesystem * @param baseDir The base cache Dir where you wnat to localize the files/archives + * @param fileStatus The file status on the dfs. * @param isArchive if the cache is an archive or a file. In case it is an archive * with a .zip or .jar extension it will be unzipped/unjarred automatically * and the directory where the archive is unjarred is returned as the Path. @@ -140,8 +141,10 @@ * the path to the file where the file is copied locally * @throws IOException */ - public static Path getLocalCache(URI cache, Configuration conf, Path baseDir, - boolean isArchive, long confFileStamp, Path currentWorkDir) + public static Path getLocalCache(URI cache, Configuration conf, + Path baseDir, FileStatus fileStatus, + boolean isArchive, long confFileStamp, + Path currentWorkDir) throws IOException { String cacheId = makeRelative(cache, conf); CacheStatus lcacheStatus; @@ -156,7 +159,7 @@ synchronized (lcacheStatus) { localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus, - isArchive, currentWorkDir); + fileStatus, isArchive, currentWorkDir); lcacheStatus.refcount++; } } @@ -173,6 +176,38 @@ } /** + * Get the locally cached file or archive; it could either be + * previously cached (and valid) or copy it from the {@link FileSystem} now. + * + * @param cache the cache to be localized, this should be specified as + * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema + * or hostname:port is provided the file is assumed to be in the filesystem + * being used in the Configuration + * @param conf The Confguration file which contains the filesystem + * @param baseDir The base cache Dir where you wnat to localize the files/archives + * @param isArchive if the cache is an archive or a file. In case it is an archive + * with a .zip or .jar extension it will be unzipped/unjarred automatically + * and the directory where the archive is unjarred is returned as the Path. + * In case of a file, the path to the file is returned + * @param confFileStamp this is the hdfs file modification timestamp to verify that the + * file to be cached hasn't changed since the job started + * @param currentWorkDir this is the directory where you would want to create symlinks + * for the locally cached files/archives + * @return the path to directory where the archives are unjarred in case of archives, + * the path to the file where the file is copied locally + * @throws IOException + + */ + public static Path getLocalCache(URI cache, Configuration conf, + Path baseDir, boolean isArchive, + long confFileStamp, Path currentWorkDir) + throws IOException { + return getLocalCache(cache, conf, + baseDir, null, isArchive, + confFileStamp, currentWorkDir); + } + + /** * This is the opposite of getlocalcache. When you are done with * using the cache, you need to release the cache * @param cache The cache URI to be released @@ -220,7 +255,7 @@ * relative path is hostname of DFS this mapred cluster is running * on/absolute_path */ - private static String makeRelative(URI cache, Configuration conf) + public static String makeRelative(URI cache, Configuration conf) throws IOException { String fsname = cache.getScheme(); String path; @@ -243,6 +278,7 @@ private static Path localizeCache(Configuration conf, URI cache, long confFileStamp, CacheStatus cacheStatus, + FileStatus fileStatus, boolean isArchive, Path currentWorkDir) throws IOException { @@ -250,7 +286,8 @@ FileSystem fs = getFileSystem(cache, conf); String link = currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment(); File flink = new File(link); - if (ifExistsAndFresh(conf, fs, cache, confFileStamp, cacheStatus)) { + if (ifExistsAndFresh(conf, fs, cache, confFileStamp, + cacheStatus, fileStatus)) { if (isArchive) { if (doSymlink){ if (!flink.exists()) @@ -280,6 +317,7 @@ localFs.delete(cacheStatus.localLoadPath); Path parchive = new Path(cacheStatus.localLoadPath, new Path(cacheStatus.localLoadPath.getName())); + if (!localFs.mkdirs(cacheStatus.localLoadPath)) { throw new IOException("Mkdirs failed to create directory " + cacheStatus.localLoadPath.toString()); @@ -334,13 +372,19 @@ // Checks if the cache has already been localized and is fresh private static boolean ifExistsAndFresh(Configuration conf, FileSystem fs, URI cache, long confFileStamp, - CacheStatus lcacheStatus) + CacheStatus lcacheStatus, + FileStatus fileStatus) throws IOException { // check for existence of the cache if (lcacheStatus.currentStatus == false) { return false; } else { - long dfsFileStamp = getTimestamp(conf, cache); + long dfsFileStamp; + if (fileStatus != null) { + dfsFileStamp = fileStatus.getModificationTime(); + } else { + dfsFileStamp = getTimestamp(conf, cache); + } // ensure that the file on hdfs hasn't been modified since the job started if (dfsFileStamp != confFileStamp) { @@ -382,7 +426,8 @@ */ public static void createAllSymlink(Configuration conf, File jobCacheDir, File workDir) throws IOException{ - if ((!jobCacheDir.isDirectory()) || (!workDir.isDirectory())){ + if ((jobCacheDir == null || !jobCacheDir.isDirectory()) || + workDir == null || (!workDir.isDirectory())) { return; } boolean createSymlink = getSymlink(conf); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalDirAllocator.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalDirAllocator.java?rev=605471&r1=605470&r2=605471&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalDirAllocator.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalDirAllocator.java Wed Dec 19 00:54:17 2007 @@ -165,6 +165,18 @@ } } + /** We search through all the configured dirs for the file's existence + * and return true when we find + * @param pathStr the requested file (this will be searched) + * @param conf the Configuration object + * @return true if files exist. false otherwise + * @throws IOException + */ + public boolean ifExists(String pathStr,Configuration conf) { + AllocatorPerContext context = obtainContext(contextCfgItemName); + return context.ifExists(pathStr, conf); + } + private static class AllocatorPerContext { private final Log LOG = @@ -326,6 +338,31 @@ //no path found throw new DiskErrorException ("Could not find " + pathStr +" in any of" + " the configured local directories"); + } + + /** We search through all the configured dirs for the file's existence + * and return true when we find one + */ + public synchronized boolean ifExists(String pathStr,Configuration conf) { + try { + int numDirs = localDirs.length; + int numDirsSearched = 0; + //remove the leading slash from the path (to make sure that the uri + //resolution results in a valid path on the dir being checked) + if (pathStr.startsWith("/")) { + pathStr = pathStr.substring(1); + } + while (numDirsSearched < numDirs) { + Path file = new Path(localDirs[numDirsSearched], pathStr); + if (localFS.exists(file)) { + return true; + } + numDirsSearched++; + } + } catch (IOException e) { + // IGNORE and try again + } + return false; } } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=605471&r1=605470&r2=605471&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Wed Dec 19 00:54:17 2007 @@ -43,6 +43,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.InMemoryFileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; @@ -796,12 +797,19 @@ // get the work directory which holds the elements we are dynamically // adding to the classpath File workDir = new File(task.getJobFile()).getParentFile(); - File jobCacheDir = new File(workDir.getParent(), "work"); ArrayList urllist = new ArrayList(); // add the jars and directories to the classpath String jar = conf.getJar(); if (jar != null) { + LocalDirAllocator lDirAlloc = + new LocalDirAllocator("mapred.local.dir"); + File jobCacheDir = new File(lDirAlloc.getLocalPathToRead( + TaskTracker.getJobCacheSubdir() + + Path.SEPARATOR + getJobId() + + Path.SEPARATOR + + "work", conf).toString()); + File[] libs = new File(jobCacheDir, "lib").listFiles(); if (libs != null) { for (int i = 0; i < libs.length; i++) { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=605471&r1=605470&r2=605471&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Wed Dec 19 00:54:17 2007 @@ -20,6 +20,7 @@ import org.apache.commons.logging.*; import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.filecache.*; import org.apache.hadoop.util.*; import java.io.*; @@ -91,18 +92,52 @@ //all the archives File workDir = new File(t.getJobFile()).getParentFile(); String taskid = t.getTaskId(); - File jobCacheDir = new File(workDir.getParent(), "work"); + LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir"); + File jobCacheDir = null; + try { + jobCacheDir = new File(lDirAlloc.getLocalPathToRead( + TaskTracker.getJobCacheSubdir() + + Path.SEPARATOR + t.getJobId() + + Path.SEPARATOR + + "work", conf).toString()); + } catch (IOException ioe) { + LOG.warn("work directory doesnt exist"); + } URI[] archives = DistributedCache.getCacheArchives(conf); URI[] files = DistributedCache.getCacheFiles(conf); + FileStatus fileStatus; + FileSystem fileSystem; + Path localPath; + String baseDir; + if ((archives != null) || (files != null)) { if (archives != null) { - String[] archivesTimestamps = DistributedCache.getArchiveTimestamps(conf); + String[] archivesTimestamps = + DistributedCache.getArchiveTimestamps(conf); Path[] p = new Path[archives.length]; for (int i = 0; i < archives.length;i++){ + fileSystem = FileSystem.get(archives[i], conf); + fileStatus = fileSystem.getFileStatus( + new Path(archives[i].getPath())); + String cacheId = DistributedCache.makeRelative(archives[i],conf); + String cachePath = TaskTracker.getCacheSubdir() + + Path.SEPARATOR + cacheId; + if (lDirAlloc.ifExists(cachePath, conf)) { + localPath = lDirAlloc.getLocalPathToRead(cachePath, conf); + } + else { + localPath = lDirAlloc.getLocalPathForWrite(cachePath, + fileStatus.getLen(), conf); + } + baseDir = localPath.toString().replace(cacheId, ""); p[i] = DistributedCache.getLocalCache(archives[i], conf, - conf.getLocalPath(TaskTracker.getCacheSubdir()), - true, Long.parseLong(archivesTimestamps[i]), - new Path(workDir.getAbsolutePath())); + new Path(baseDir), + fileStatus, + true, Long.parseLong( + archivesTimestamps[i]), + new Path(workDir. + getAbsolutePath())); + } DistributedCache.setLocalArchives(conf, stringifyPathArray(p)); } @@ -110,10 +145,26 @@ String[] fileTimestamps = DistributedCache.getFileTimestamps(conf); Path[] p = new Path[files.length]; for (int i = 0; i < files.length;i++){ + fileSystem = FileSystem.get(files[i], conf); + fileStatus = fileSystem.getFileStatus( + new Path(files[i].getPath())); + String cacheId = DistributedCache.makeRelative(files[i], conf); + String cachePath = TaskTracker.getCacheSubdir() + + Path.SEPARATOR + cacheId; + if (lDirAlloc.ifExists(cachePath,conf)) { + localPath = lDirAlloc.getLocalPathToRead(cachePath, conf); + } else { + localPath = lDirAlloc.getLocalPathForWrite(cachePath, + fileStatus.getLen(), conf); + } + baseDir = localPath.toString().replace(cacheId, ""); p[i] = DistributedCache.getLocalCache(files[i], conf, - conf.getLocalPath(TaskTracker.getCacheSubdir()), - false, Long.parseLong(fileTimestamps[i]), - new Path(workDir.getAbsolutePath())); + new Path(baseDir), + fileStatus, + false, Long.parseLong( + fileTimestamps[i]), + new Path(workDir. + getAbsolutePath())); } DistributedCache.setLocalFiles(conf, stringifyPathArray(p)); } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=605471&r1=605470&r2=605471&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Dec 19 00:54:17 2007 @@ -53,6 +53,7 @@ import org.apache.hadoop.fs.DF; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSError; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.LocalDirAllocator; @@ -601,20 +602,44 @@ } } + private LocalDirAllocator lDirAlloc = + new LocalDirAllocator("mapred.local.dir"); + // intialize the job directory private void localizeJob(TaskInProgress tip) throws IOException { Path localJarFile = null; Task t = tip.getTask(); String jobId = t.getJobId(); - Path localJobFile = new Path(fConf.getLocalPath(getJobCacheSubdir()), - jobId + Path.SEPARATOR + "job.xml"); + String jobFile = t.getJobFile(); + // Get sizes of JobFile and JarFile + // sizes are -1 if they are not present. + FileSystem fileSystem = FileSystem.get(fConf); + FileStatus status[] = fileSystem.listStatus(new Path(jobFile).getParent()); + long jarFileSize = -1; + long jobFileSize = -1; + for(FileStatus stat : status) { + if (stat.getPath().toString().contains("job.xml")) { + jobFileSize = stat.getLen(); + } else { + jobFileSize = -1; + } + if (stat.getPath().toString().contains("job.jar")) { + jarFileSize = stat.getLen(); + } else { + jarFileSize = -1; + } + } + // Here we check for double the size of jobfile to accommodate for + // localize task file and we check four times the size of jarFileSize to + // accommodate for unjarring the jar file in work directory + Path localJobFile = lDirAlloc.getLocalPathForWrite((getJobCacheSubdir() + + Path.SEPARATOR + jobId + + Path.SEPARATOR + "job.xml"), + 2 * jobFileSize + 5 * jarFileSize, fConf); RunningJob rjob = addTaskToJob(jobId, localJobFile, tip); synchronized (rjob) { if (!rjob.localized) { - localJarFile = new Path(fConf.getLocalPath(getJobCacheSubdir()), - jobId + Path.SEPARATOR + "job.jar"); - String jobFile = t.getJobFile(); FileSystem localFs = FileSystem.getLocal(fConf); // this will happen on a partial execution of localizeJob. // Sometimes the job.xml gets copied but copying job.jar @@ -632,6 +657,7 @@ JobConf localJobConf = new JobConf(localJobFile); String jarFile = localJobConf.getJar(); if (jarFile != null) { + localJarFile = new Path(jobDir,"job.jar"); fs.copyToLocalFile(new Path(jarFile), localJarFile); localJobConf.setJar(localJarFile.toString()); OutputStream out = localFs.create(localJobFile); @@ -1208,11 +1234,11 @@ localDirsDf.put(localDirs[i], df); } - if (df.getAvailable() < minSpace) - return false; + if (df.getAvailable() > minSpace) + return true; } - return true; + return false; } /** @@ -1345,9 +1371,10 @@ } private void localizeTask(Task task) throws IOException{ - Path localTaskDir = - new Path(this.defaultJobConf.getLocalPath(TaskTracker.getJobCacheSubdir()), - (task.getJobId() + Path.SEPARATOR + task.getTaskId())); + Path localTaskDir = + lDirAlloc.getLocalPathForWrite((TaskTracker.getJobCacheSubdir() + + Path.SEPARATOR + task.getJobId() + Path.SEPARATOR + + task.getTaskId()), defaultJobConf ); FileSystem localFs = FileSystem.getLocal(fConf); if (!localFs.mkdirs(localTaskDir)) { throw new IOException("Mkdirs failed to create " + localTaskDir.toString());