Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 62028 invoked from network); 17 Jun 2010 12:28:39 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 17 Jun 2010 12:28:39 -0000 Received: (qmail 84474 invoked by uid 500); 17 Jun 2010 10:41:59 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 84391 invoked by uid 500); 17 Jun 2010 10:41:56 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 84375 invoked by uid 99); 17 Jun 2010 10:41:55 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Jun 2010 10:41:55 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Jun 2010 10:41:54 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 7C68A23889E7; Thu, 17 Jun 2010 10:41:08 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r955543 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java Date: Thu, 17 Jun 2010 10:41:08 -0000 To: mapreduce-commits@hadoop.apache.org From: amareshwari@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100617104108.7C68A23889E7@eris.apache.org> Author: amareshwari Date: Thu Jun 17 10:41:08 2010 New Revision: 955543 URL: http://svn.apache.org/viewvc?rev=955543&view=rev Log: MAPREDUCE-1225. Fixes DistributedCache to check if the file is fresh or not, for the first localization also. Contributed by Zhong Wang. Modified: hadoop/mapreduce/trunk/CHANGES.txt hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java Modified: hadoop/mapreduce/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=955543&r1=955542&r2=955543&view=diff ============================================================================== --- hadoop/mapreduce/trunk/CHANGES.txt (original) +++ hadoop/mapreduce/trunk/CHANGES.txt Thu Jun 17 10:41:08 2010 @@ -92,6 +92,9 @@ Trunk (unreleased changes) MAPREDUCE-1813. NPE in PipeMapred.MRErrorThread. (Ravi Gummadi via vinodkv) + MAPREDUCE-1225. Fixes DistributedCache to check if the file is fresh or not, + for the first localization also. (Zhong Wang via amareshwari) + Release 0.21.0 - Unreleased INCOMPATIBLE CHANGES Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=955543&r1=955542&r2=955543&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Thu Jun 17 10:41:08 2010 @@ -174,8 +174,11 @@ public class TrackerDistributedCacheMana // do the localization, after releasing the global lock synchronized (lcacheStatus) { if (!lcacheStatus.isInited()) { + FileSystem fs = FileSystem.get(cache, conf); + checkStampSinceJobStarted(conf, fs, cache, confFileStamp, + lcacheStatus, fileStatus); localizedPath = localizeCache(conf, cache, confFileStamp, - lcacheStatus, fileStatus, isArchive, isPublic); + lcacheStatus, isArchive, isPublic); lcacheStatus.initComplete(); } else { localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp, @@ -404,7 +407,6 @@ public class TrackerDistributedCacheMana Path localizeCache(Configuration conf, URI cache, long confFileStamp, CacheStatus cacheStatus, - FileStatus fileStatus, boolean isArchive, boolean isPublic) throws IOException { FileSystem fs = FileSystem.get(cache, conf); @@ -488,9 +490,9 @@ public class TrackerDistributedCacheMana return (filename.endsWith(".tgz") || filename.endsWith(".tar.gz") || filename.endsWith(".tar")); } - - // Checks if the cache has already been localized and is fresh - private boolean ifExistsAndFresh(Configuration conf, FileSystem fs, + + // ensure that the file on hdfs hasn't been modified since the job started + private long checkStampSinceJobStarted(Configuration conf, FileSystem fs, URI cache, long confFileStamp, CacheStatus lcacheStatus, FileStatus fileStatus) @@ -502,13 +504,23 @@ public class TrackerDistributedCacheMana dfsFileStamp = getTimestamp(conf, cache); } - // ensure that the file on hdfs hasn't been modified since the job started if (dfsFileStamp != confFileStamp) { LOG.fatal("File: " + cache + " has changed on HDFS since job started"); throw new IOException("File: " + cache + " has changed on HDFS since job started"); } + + return dfsFileStamp; + } + // Checks if the cache has already been localized and is fresh + private boolean ifExistsAndFresh(Configuration conf, FileSystem fs, + URI cache, long confFileStamp, + CacheStatus lcacheStatus, + FileStatus fileStatus) + throws IOException { + long dfsFileStamp = checkStampSinceJobStarted(conf, fs, cache, + confFileStamp, lcacheStatus, fileStatus); if (dfsFileStamp != lcacheStatus.mtime) { return false; } Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java?rev=955543&r1=955542&r2=955543&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java Thu Jun 17 10:41:08 2010 @@ -202,13 +202,13 @@ public class TestTrackerDistributedCache @Override Path localizeCache(Configuration conf, URI cache, long confFileStamp, - CacheStatus cacheStatus, FileStatus fileStatus, boolean isArchive, - boolean isPublic) throws IOException { + CacheStatus cacheStatus, boolean isArchive, boolean isPublic) + throws IOException { if (cache.equals(firstCacheFile.toUri())) { throw new IOException("fake fail"); } return super.localizeCache(conf, cache, confFileStamp, cacheStatus, - fileStatus, isArchive, isPublic); + isArchive, isPublic); } } @@ -426,6 +426,12 @@ public class TestTrackerDistributedCache protected String getJobOwnerName() throws IOException { return UserGroupInformation.getLoginUser().getUserName(); } + + private long getFileStamp(Path file) throws IOException { + FileStatus fileStatus = fs.getFileStatus(file); + return fileStatus.getModificationTime(); + } + /** test delete cache */ public void testDeleteCache() throws Exception { @@ -448,7 +454,6 @@ public class TestTrackerDistributedCache new TrackerDistributedCacheManager(conf2, taskController); manager.startCleanupThread(); FileSystem localfs = FileSystem.getLocal(conf2); - long now = System.currentTimeMillis(); String userName = getJobOwnerName(); conf2.set(MRJobConfig.USER_NAME, userName); @@ -456,8 +461,9 @@ public class TestTrackerDistributedCache Path localCache = manager.getLocalCache(firstCacheFile.toUri(), conf2, TaskTracker.getPrivateDistributedCacheDir(userName), fs.getFileStatus(firstCacheFile), false, - now, new Path(TEST_ROOT_DIR), false, false); - manager.releaseCache(firstCacheFile.toUri(), conf2, now); + getFileStamp(firstCacheFile), new Path(TEST_ROOT_DIR), false, false); + manager.releaseCache(firstCacheFile.toUri(), conf2, + getFileStamp(firstCacheFile)); //in above code,localized a file of size 4K and then release the cache // which will cause the cache be deleted when the limit goes out. // The below code localize another cache which's designed to @@ -465,7 +471,7 @@ public class TestTrackerDistributedCache manager.getLocalCache(secondCacheFile.toUri(), conf2, TaskTracker.getPrivateDistributedCacheDir(userName), fs.getFileStatus(secondCacheFile), false, - System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false); + getFileStamp(secondCacheFile), new Path(TEST_ROOT_DIR), false, false); checkCacheDeletion(localfs, localCache, "DistributedCache failed " + "deleting old cache when the cache store is full."); // Now we test the number of sub directories limit @@ -479,15 +485,16 @@ public class TestTrackerDistributedCache Path thirdLocalCache = manager.getLocalCache(thirdCacheFile.toUri(), conf2, TaskTracker.getPrivateDistributedCacheDir(userName), fs.getFileStatus(thirdCacheFile), false, - now, new Path(TEST_ROOT_DIR), false, false); + getFileStamp(thirdCacheFile), new Path(TEST_ROOT_DIR), false, false); // Release the third cache so that it can be deleted while sweeping - manager.releaseCache(thirdCacheFile.toUri(), conf2, now); + manager.releaseCache(thirdCacheFile.toUri(), conf2, + getFileStamp(thirdCacheFile)); // Getting the fourth cache will make the number of sub directories becomes // 3 which is greater than 2. So the released cache will be deleted. manager.getLocalCache(fourthCacheFile.toUri(), conf2, TaskTracker.getPrivateDistributedCacheDir(userName), fs.getFileStatus(fourthCacheFile), false, - System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false); + getFileStamp(fourthCacheFile), new Path(TEST_ROOT_DIR), false, false); checkCacheDeletion(localfs, thirdLocalCache, "DistributedCache failed deleting old" + " cache when the cache exceeds the number of sub directories limit."); @@ -530,7 +537,7 @@ public class TestTrackerDistributedCache Path result = manager.getLocalCache(fileToCache.toUri(), conf, TaskTracker.getPrivateDistributedCacheDir(userName), fs.getFileStatus(firstCacheFile), false, - System.currentTimeMillis(), + getFileStamp(firstCacheFile), new Path(TEST_ROOT_DIR), false, false); assertNotNull("DistributedCache cached file on non-default filesystem.", result); @@ -654,6 +661,27 @@ public class TestTrackerDistributedCache // release handle.release(); + // running a task of the same job on another TaskTracker which has never + // initialized the cache + TrackerDistributedCacheManager manager2 = + new TrackerDistributedCacheManager(myConf, taskController); + TaskDistributedCacheManager handle2 = + manager2.newTaskDistributedCacheManager(subConf); + File workDir2 = new File(new Path(TEST_ROOT_DIR, "workdir2").toString()); + th = null; + try { + handle2.setup(localDirAllocator, workDir2, TaskTracker + .getPrivateDistributedCacheDir(userName), + TaskTracker.getPublicDistributedCacheDir()); + } catch (IOException ie) { + th = ie; + } + assertNotNull("Throwable is null", th); + assertTrue("Exception message does not match", + th.getMessage().contains("has changed on HDFS since job started")); + // release + handle.release(); + // submit another job Configuration subConf2 = new Configuration(myConf); subConf2.set(MRJobConfig.USER_NAME, userName); @@ -696,13 +724,12 @@ public class TestTrackerDistributedCache TrackerDistributedCacheManager manager = new TrackerDistributedCacheManager(conf, taskController); FileSystem localfs = FileSystem.getLocal(conf); - long now = System.currentTimeMillis(); Path[] localCache = new Path[2]; localCache[0] = manager.getLocalCache(firstCacheFile.toUri(), conf, TaskTracker.getPrivateDistributedCacheDir(userName), fs.getFileStatus(firstCacheFile), false, - now, new Path(TEST_ROOT_DIR), false, false); + getFileStamp(firstCacheFile), new Path(TEST_ROOT_DIR), false, false); FsPermission myPermission = new FsPermission((short)0600); Path myFile = new Path(localCache[0].getParent(), "myfile.txt"); if (FileSystem.create(localfs, myFile, myPermission) == null) { @@ -712,7 +739,8 @@ public class TestTrackerDistributedCache localCache[1] = manager.getLocalCache(secondCacheFile.toUri(), conf, TaskTracker.getPrivateDistributedCacheDir(userName), fs.getFileStatus(secondCacheFile), false, - System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false); + getFileStamp(secondCacheFile), new Path(TEST_ROOT_DIR), false, + false); FileStatus stat = localfs.getFileStatus(myFile); assertTrue(stat.getPermission().equals(myPermission)); // validate permissions of localized files.