Return-Path: Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: (qmail 18190 invoked from network); 4 Mar 2011 04:32:15 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 4 Mar 2011 04:32:15 -0000 Received: (qmail 35260 invoked by uid 500); 4 Mar 2011 04:32:15 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 35236 invoked by uid 500); 4 Mar 2011 04:32:15 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 35224 invoked by uid 99); 4 Mar 2011 04:32:15 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 04:32:15 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 04:32:13 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id BA2E623889E7; Fri, 4 Mar 2011 04:31:53 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1077584 - /hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCacheFileReferenceCount.java Date: Fri, 04 Mar 2011 04:31:53 -0000 To: common-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110304043153.BA2E623889E7@eris.apache.org> Author: omalley Date: Fri Mar 4 04:31:53 2011 New Revision: 1077584 URL: http://svn.apache.org/viewvc?rev=1077584&view=rev Log: commit 0c8304a49108bb0bd5e21f0eab1fbdef6671a67d Author: Vinay Kumar Thota Date: Fri Jul 23 06:29:56 2010 +0000 MAPREDUCE:1919 from https://issues.apache.org/jira/secure/attachment/12450269/1919-ydist-security.patch Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCacheFileReferenceCount.java Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCacheFileReferenceCount.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCacheFileReferenceCount.java?rev=1077584&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCacheFileReferenceCount.java (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCacheFileReferenceCount.java Fri Mar 4 04:31:53 2011 @@ -0,0 +1,283 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.test.system.JobInfo; +import org.apache.hadoop.mapreduce.test.system.MRCluster; +import org.apache.hadoop.mapreduce.test.system.JTProtocol; +import org.apache.hadoop.mapreduce.test.system.JTClient; +import org.apache.hadoop.mapreduce.test.system.TTClient; +import org.apache.hadoop.mapreduce.test.system.TaskInfo; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.examples.SleepJob; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import java.net.URI; +import java.io.DataOutputStream; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Assert; +import org.junit.Test; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Verify the job cache files localizations. + */ +public class TestCacheFileReferenceCount { + private static final Log LOG = LogFactory + .getLog(TestCacheFileReferenceCount.class); + private static Configuration conf = new Configuration(); + private static MRCluster cluster; + private static Path tmpFolderPath = null; + private static String cacheFile1 = "cache1.txt"; + private static String cacheFile2 = "cache2.txt"; + private static String cacheFile3 = "cache3.txt"; + private static String cacheFile4 = "cache4.txt"; + private static JTProtocol wovenClient = null; + private static JobClient jobClient = null; + private static JTClient jtClient = null; + private static URI cacheFileURI1; + private static URI cacheFileURI2; + private static URI cacheFileURI3; + private static URI cacheFileURI4; + + @BeforeClass + public static void before() throws Exception { + cluster = MRCluster.createCluster(conf); + cluster.setUp(); + tmpFolderPath = new Path("hdfs:///tmp"); + jtClient = cluster.getJTClient(); + jobClient = jtClient.getClient(); + wovenClient = cluster.getJTClient().getProxy(); + cacheFileURI1 = createCacheFile(tmpFolderPath, cacheFile1); + cacheFileURI2 = createCacheFile(tmpFolderPath, cacheFile2); + } + + @AfterClass + public static void after() throws Exception { + deleteCacheFile(new Path(tmpFolderPath, cacheFile1)); + deleteCacheFile(new Path(tmpFolderPath, cacheFile2)); + deleteCacheFile(new Path(tmpFolderPath, cacheFile4)); + cluster.tearDown(); + } + + /** + * Run the job with two distributed cache files and verify + * whether job is succeeded or not. + * @throws Exception + */ + @Test + public void testCacheFilesLocalization() throws Exception { + conf = wovenClient.getDaemonConf(); + SleepJob job = new SleepJob(); + job.setConf(conf); + JobConf jobConf = job.setupJobConf(4, 1, 4000, 4000, 1000, 1000); + DistributedCache.createSymlink(jobConf); + DistributedCache.addCacheFile(cacheFileURI1, jobConf); + DistributedCache.addCacheFile(cacheFileURI2, jobConf); + RunningJob runJob = jobClient.submitJob(jobConf); + JobID jobId = runJob.getID(); + + Assert.assertTrue("Job has not been started for 1 min.", + jtClient.isJobStarted(jobId)); + TaskInfo[] taskInfos = wovenClient.getTaskInfo(jobId); + Assert.assertTrue("Cache File1 has not been localize", + checkLocalization(taskInfos,cacheFile1)); + Assert.assertTrue("Cache File2 has not been localize", + checkLocalization(taskInfos,cacheFile2)); + JobInfo jInfo = wovenClient.getJobInfo(jobId); + LOG.info("Waiting till the job is completed..."); + while (!jInfo.getStatus().isJobComplete()) { + UtilsForTests.waitFor(100); + jInfo = wovenClient.getJobInfo(jobId); + } + Assert.assertEquals("Job has not been succeeded", + jInfo.getStatus().getRunState(), JobStatus.SUCCEEDED); + } + + /** + * Run the job with distributed cache files and remove one cache + * file from the DFS when it is localized.verify whether the job + * is failed or not. + * @throws Exception + */ + @Test + public void testDeleteCacheFileInDFSAfterLocalized() throws Exception { + conf = wovenClient.getDaemonConf(); + SleepJob job = new SleepJob(); + job.setConf(conf); + JobConf jobConf = job.setupJobConf(4, 1, 4000, 4000, 1000, 1000); + cacheFileURI3 = createCacheFile(tmpFolderPath, cacheFile3); + DistributedCache.createSymlink(jobConf); + DistributedCache.addCacheFile(cacheFileURI3, jobConf); + RunningJob runJob = jobClient.submitJob(jobConf); + JobID jobId = runJob.getID(); + Assert.assertTrue("Job has not been started for 1 min.", + jtClient.isJobStarted(jobId)); + TaskInfo[] taskInfos = wovenClient.getTaskInfo(jobId); + boolean iscacheFileLocalized = checkLocalization(taskInfos,cacheFile3); + Assert.assertTrue("CacheFile has not been localized", + iscacheFileLocalized); + deleteCacheFile(new Path(tmpFolderPath, cacheFile3)); + JobInfo jInfo = wovenClient.getJobInfo(jobId); + LOG.info("Waiting till the job is completed..."); + while (!jInfo.getStatus().isJobComplete()) { + UtilsForTests.waitFor(100); + jInfo = wovenClient.getJobInfo(jobId); + } + Assert.assertEquals("Job has not been failed", + jInfo.getStatus().getRunState(), JobStatus.FAILED); + } + + /** + * Run the job with two distribute cache files and the size of + * one file should be larger than local.cache.size.Verify + * whether job is succeeded or not. + * @throws Exception + */ + @Test + public void testCacheSizeExceeds() throws Exception { + conf = wovenClient.getDaemonConf(); + SleepJob job = new SleepJob(); + String jobArgs []= {"-D","local.cache.size=1024", + "-m", "4", + "-r", "2", + "-mt", "2000", + "-rt", "2000", + "-recordt","100"}; + JobConf jobConf = new JobConf(conf); + cacheFileURI4 = createCacheFile(tmpFolderPath, cacheFile4); + DistributedCache.createSymlink(jobConf); + DistributedCache.addCacheFile(cacheFileURI4, jobConf); + int countBeforeJS = jtClient.getClient().getAllJobs().length; + JobID prvJobId = jtClient.getClient().getAllJobs()[0].getJobID(); + int exitCode = ToolRunner.run(jobConf,job,jobArgs); + Assert.assertEquals("Exit Code:", 0, exitCode); + int countAfterJS = jtClient.getClient().getAllJobs().length; + int counter = 0; + while (counter++ < 30 ) { + if (countBeforeJS == countAfterJS) { + UtilsForTests.waitFor(1000); + countAfterJS = jtClient.getClient().getAllJobs().length; + } else { + break; + } + } + JobID jobId = jtClient.getClient().getAllJobs()[0].getJobID(); + counter = 0; + while (counter++ < 30) { + if (jobId.toString().equals(prvJobId.toString())) { + UtilsForTests.waitFor(1000); + jobId = jtClient.getClient().getAllJobs()[0].getJobID(); + } else { + break; + } + } + JobInfo jInfo = wovenClient.getJobInfo(jobId); + Assert.assertEquals("Job has not been succeeded", + jInfo.getStatus().getRunState(), JobStatus.SUCCEEDED); + } + + private boolean checkLocalization(TaskInfo[] taskInfos, String cacheFile) + throws Exception { + boolean iscacheFileLocalized = false; + for (TaskInfo taskinfo : taskInfos) { + if (!taskinfo.isSetupOrCleanup()) { + String[] taskTrackers = taskinfo.getTaskTrackers(); + List ttList = getTTClients(taskTrackers); + for (TTClient ttClient : ttList) { + iscacheFileLocalized = checkCacheFile(ttClient,cacheFile); + if(iscacheFileLocalized) { + return true; + } + } + } + } + return false; + } + + private List getTTClients(String[] taskTrackers) + throws Exception { + List ttClientList= new ArrayList(); + for (String taskTracker: taskTrackers) { + taskTracker = UtilsForTests.getFQDNofTT(taskTracker); + TTClient ttClient = cluster.getTTClient(taskTracker); + if (ttClient != null) { + ttClientList.add(ttClient); + } + } + return ttClientList; + } + + private boolean checkCacheFile(TTClient ttClient, String cacheFile) + throws IOException { + String[] localDirs = ttClient.getMapredLocalDirs(); + for (String localDir : localDirs) { + localDir = localDir + Path.SEPARATOR + + TaskTracker.getPublicDistributedCacheDir(); + FileStatus[] fileStatuses = ttClient.listStatus(localDir, + true, true); + for (FileStatus fileStatus : fileStatuses) { + Path path = fileStatus.getPath(); + if ((path.toString()).endsWith(cacheFile)) { + return true; + } + } + } + return false; + } + + private static void deleteCacheFile(Path cacheFilePath) + throws IOException { + FileSystem dfs = jobClient.getFs(); + dfs.delete(cacheFilePath, true); + } + + private static URI createCacheFile(Path tmpFolderPath, String cacheFile) + throws IOException { + String input = "distribute cache content..."; + FileSystem dfs = jobClient.getFs(); + conf = wovenClient.getDaemonConf(); + FileSystem fs = tmpFolderPath.getFileSystem(conf); + if (!fs.mkdirs(tmpFolderPath)) { + throw new IOException("Failed to create the temp directory:" + + tmpFolderPath.toString()); + } + deleteCacheFile(new Path(tmpFolderPath, cacheFile)); + DataOutputStream file = fs.create(new Path(tmpFolderPath, cacheFile)); + int i = 0; + while(i++ < 100) { + file.writeBytes(input); + } + file.close(); + dfs.setPermission(tmpFolderPath, new FsPermission(FsAction.ALL, + FsAction.ALL, FsAction.ALL)); + URI uri = URI.create(new Path(tmpFolderPath, cacheFile).toString()); + return uri; + } +}