From hadoop-commits-return-641-apmail-lucene-hadoop-commits-archive=lucene.apache.org@lucene.apache.org Thu Sep 14 22:14:24 2006 Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 33754 invoked from network); 14 Sep 2006 22:14:24 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 14 Sep 2006 22:14:24 -0000 Received: (qmail 61155 invoked by uid 500); 14 Sep 2006 22:14:21 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 60895 invoked by uid 500); 14 Sep 2006 22:14:17 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 60834 invoked by uid 99); 14 Sep 2006 22:14:17 -0000 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received-SPF: pass (hermes.apache.org: local policy) Received: from [140.211.166.113] (HELO eris.apache.org) (140.211.166.113) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Sep 2006 15:14:15 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id F22671A981A; Thu, 14 Sep 2006 15:13:44 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r443497 - in /lucene/hadoop/trunk: ./ conf/ src/java/org/apache/hadoop/filecache/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/util/ src/test/org/apache/hadoop/mapred/ Date: Thu, 14 Sep 2006 22:13:44 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20060914221344.F22671A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: cutting Date: Thu Sep 14 15:13:43 2006 New Revision: 443497 URL: http://svn.apache.org/viewvc?view=rev&rev=443497 Log: HADOOP-288. Add a file caching system and use it in MapReduce to cache job jar files on slave nodes. Contributed by Mahadev. Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/ lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.jar (with props) lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.txt lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.zip (with props) Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/build.xml lucene/hadoop/trunk/conf/hadoop-default.xml lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=443497&r1=443496&r2=443497 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Thu Sep 14 15:13:43 2006 @@ -14,6 +14,9 @@ 3. HADOOP-530. Improve error messages in SequenceFile when keys or values are of the wrong type. (Hairong Kuang via cutting) +4. HADOOP-288. Add a file caching system and use it in MapReduce to + cache job jar files on slave nodes. (Mahadev Konar via cutting) + Release 0.6.1 - 2006-08-13 Modified: lucene/hadoop/trunk/build.xml URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/build.xml?view=diff&rev=443497&r1=443496&r2=443497 ============================================================================== --- lucene/hadoop/trunk/build.xml (original) +++ lucene/hadoop/trunk/build.xml Thu Sep 14 15:13:43 2006 @@ -36,6 +36,7 @@ + @@ -277,6 +278,11 @@ value="org/apache/hadoop/test/AllTestDriver"/> + + + + + @@ -288,7 +294,6 @@ - Modified: lucene/hadoop/trunk/conf/hadoop-default.xml URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?view=diff&rev=443497&r1=443496&r2=443497 ============================================================================== --- lucene/hadoop/trunk/conf/hadoop-default.xml (original) +++ lucene/hadoop/trunk/conf/hadoop-default.xml Thu Sep 14 15:13:43 2006 @@ -271,6 +271,14 @@ + local.cache.size + 10737418240 + The limit on the size of cache you want to keep, set by default + to 10GB. This will act as a soft limit on the cache directory for out of band data. + + + + mapred.system.dir ${hadoop.tmp.dir}/mapred/system The shared directory where MapReduce stores control files. Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java?view=auto&rev=443497 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java Thu Sep 14 15:13:43 2006 @@ -0,0 +1,466 @@ +/* Copyright 2005 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.filecache; + +import org.apache.commons.logging.*; +import java.io.*; +import java.util.*; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.util.*; +import org.apache.hadoop.fs.*; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.net.URI; + +/******************************************************************************* + * The DistributedCache maintains all the caching information of cached archives + * and unarchives all the files as well and returns the path + * + * @author Mahadev Konar + ******************************************************************************/ +public class DistributedCache { + // cacheID to cacheStatus mapping + private static TreeMap cachedArchives = new TreeMap(); + // buffer size for reading checksum files + private static final int CRC_BUFFER_SIZE = 64 * 1024; + + /** + * + * @param cache the cache to be localized, this should be specified as + * new URI(dfs://hostname:port/absoulte_path_to_file). If no schema + * or hostname:port is provided the file is assumed to be in the filesystem + * being used in the Configuration + * @param conf The Confguration file which contains the filesystem + * @param baseDir The base cache Dir where you wnat to localize the files/archives + * @param isArchive if the cache is an archive or a file. In case it is an archive + * with a .zip or .jar extension it will be unzipped/unjarred automatically + * and the directory where the archive is unjarred is returned as the Path. + * In case of a file, the path to the file is returned + * @param md5 this is a mere checksum to verufy if you are using the right cache. + * You need to pass the md5 of the crc file in DFS. This is matched against the one + * calculated in this api and if it does not match, the cache is not localized. + * @return the path to directory where the archives are unjarred in case of archives, + * the path to the file where the file is copied locally + * @throws IOException + */ + public static Path getLocalCache(URI cache, Configuration conf, Path baseDir, + boolean isArchive, String md5) throws IOException { + String cacheId = makeRelative(cache, conf); + CacheStatus lcacheStatus; + Path localizedPath; + synchronized (cachedArchives) { + if (!cachedArchives.containsKey(cacheId)) { + // was never localized + lcacheStatus = new CacheStatus(); + lcacheStatus.currentStatus = false; + lcacheStatus.refcount = 1; + lcacheStatus.localLoadPath = new Path(baseDir, new Path(cacheId)); + cachedArchives.put(cacheId, lcacheStatus); + } else { + lcacheStatus = (CacheStatus) cachedArchives.get(cacheId); + synchronized (lcacheStatus) { + lcacheStatus.refcount++; + } + } + } + synchronized (lcacheStatus) { + localizedPath = localizeCache(cache, lcacheStatus, conf, isArchive, md5); + } + // try deleting stuff if you can + long size = FileUtil.getDU(new File(baseDir.toString())); + // setting the cache size to a default of 1MB + long allowedSize = conf.getLong("local.cache.size", 1048576L); + if (allowedSize < size) { + // try some cache deletions + deleteCache(conf); + } + return localizedPath; + } + + /** + * This is the opposite of getlocalcache. When you are done with + * using the cache, you need to release the cache + * @param cache The cache URI to be released + * @param conf configuration which contains the filesystem the cache + * is contained in. + * @throws IOException + */ + public static void releaseCache(URI cache, Configuration conf) + throws IOException { + String cacheId = makeRelative(cache, conf); + synchronized (cachedArchives) { + CacheStatus lcacheStatus = (CacheStatus) cachedArchives.get(cacheId); + synchronized (lcacheStatus) { + lcacheStatus.refcount--; + } + } + } + + // To delete the caches which have a refcount of zero + + private static void deleteCache(Configuration conf) throws IOException { + // try deleting cache Status with refcount of zero + synchronized (cachedArchives) { + for (Iterator it = cachedArchives.keySet().iterator(); it.hasNext();) { + String cacheId = (String) it.next(); + CacheStatus lcacheStatus = (CacheStatus) cachedArchives.get(cacheId); + if (lcacheStatus.refcount == 0) { + // delete this cache entry + FileSystem.getNamed("local", conf).delete(lcacheStatus.localLoadPath); + cachedArchives.remove(cacheId); + } + } + } + } + + /* + * Returns the relative path of the dir this cache will be localized in + * relative path that this cache will be localized in. For + * dfs://hostname:port/absolute_path -- the relative path is + * hostname/absolute path -- if it is just /absolute_path -- then the + * relative path is hostname of DFS this mapred cluster is running + * on/absolute_path + */ + private static String makeRelative(URI cache, Configuration conf) + throws IOException { + String fsname = cache.getScheme(); + String path; + FileSystem dfs = FileSystem.get(conf); + if ("dfs".equals(fsname)) { + path = cache.getHost() + cache.getPath(); + } else { + String[] split = dfs.getName().split(":"); + path = split[0] + cache.getPath(); + } + return path; + } + + private static Path cacheFilePath(Path p) { + return new Path(p, p.getName()); + } + + // the methoed which actually copies the caches locally and unjars/unzips them + private static Path localizeCache(URI cache, CacheStatus cacheStatus, + Configuration conf, boolean isArchive, String md5) throws IOException { + boolean b = true; + FileSystem dfs = getFileSystem(cache, conf); + b = ifExistsAndFresh(cacheStatus, cache, dfs, md5, conf); + if (b) { + if (isArchive) + return cacheStatus.localLoadPath; + else + return cacheFilePath(cacheStatus.localLoadPath); + } else { + // remove the old archive + // if the old archive cannot be removed since it is being used by another + // job + // return null + if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true)) + throw new IOException("Cache " + cacheStatus.localLoadPath.toString() + + " is in use and cannot be refreshed"); + byte[] checkSum = createMD5(cache, conf); + FileSystem localFs = FileSystem.getNamed("local", conf); + localFs.delete(cacheStatus.localLoadPath); + Path parchive = new Path(cacheStatus.localLoadPath, + new Path(cacheStatus.localLoadPath.getName())); + + localFs.mkdirs(cacheStatus.localLoadPath); + String cacheId = cache.getPath(); + dfs.copyToLocalFile(new Path(cacheId), parchive); + dfs.copyToLocalFile(new Path(cacheId + "_md5"), new Path(parchive + .toString() + + "_md5")); + if (isArchive) { + String tmpArchive = parchive.toString().toLowerCase(); + if (tmpArchive.endsWith(".jar")) { + RunJar.unJar(new File(parchive.toString()), new File(parchive + .getParent().toString())); + } else if (tmpArchive.endsWith(".zip")) { + FileUtil.unZip(new File(parchive.toString()), new File(parchive + .getParent().toString())); + + } + // else will not do anyhting + // and copy the file into the dir as it is + } + cacheStatus.currentStatus = true; + cacheStatus.md5 = checkSum; + } + if (isArchive) + return cacheStatus.localLoadPath; + else + return cacheFilePath(cacheStatus.localLoadPath); + + } + + // Checks if the cache has already been localized and is fresh + private static boolean ifExistsAndFresh(CacheStatus lcacheStatus, URI cache, + FileSystem dfs, String confMD5, Configuration conf) throws IOException { + // compute the md5 of the crc + byte[] digest = null; + byte[] fsDigest = createMD5(cache, conf); + byte[] confDigest = StringUtils.hexStringToByte(confMD5); + // check for existence of the cache + if (lcacheStatus.currentStatus == false) { + return false; + } else { + digest = lcacheStatus.md5; + if (!MessageDigest.isEqual(confDigest, fsDigest)) { + throw new IOException("Inconsistencty in data caching, " + + "Cache archives have been changed"); + } else { + if (!MessageDigest.isEqual(confDigest, digest)) { + // needs refreshing + return false; + } else { + // does not need any refreshing + return true; + } + } + } + } + + /** + * Returns md5 of the checksum file for a given dfs file. + * This method also creates file filename_md5 existence of which + * signifies a new cache has been loaded into dfs. So if you want to + * refresh the cache, you need to delete this md5 file as well. + * @param cache The cache to get the md5 checksum for + * @param conf configuration + * @return md5 of the crc of the cache parameter + * @throws IOException + */ + public static byte[] createMD5(URI cache, Configuration conf) + throws IOException { + byte[] b = new byte[CRC_BUFFER_SIZE]; + byte[] digest = null; + + FileSystem fileSystem = getFileSystem(cache, conf); + String filename = cache.getPath(); + Path filePath = new Path(filename); + Path md5File = new Path(filePath.getParent().toString() + Path.SEPARATOR + + filePath.getName() + "_md5"); + MessageDigest md5 = null; + try { + md5 = MessageDigest.getInstance("MD5"); + } catch (NoSuchAlgorithmException na) { + // do nothing + } + if (!fileSystem.exists(md5File)) { + FSInputStream fsStream = fileSystem.openRaw(FileSystem + .getChecksumFile(filePath)); + int read = fsStream.read(b); + while (read != -1) { + md5.update(b, 0, read); + read = fsStream.read(b); + } + fsStream.close(); + digest = md5.digest(); + + FSDataOutputStream out = fileSystem.create(md5File); + out.write(digest); + out.close(); + } else { + FSInputStream fsStream = fileSystem.openRaw(md5File); + digest = new byte[md5.getDigestLength()]; + // assuming reading 16 bytes once is not a problem + // though it should be checked if 16 bytes have been read or not + int read = fsStream.read(digest); + fsStream.close(); + } + + return digest; + } + + private static String getFileSysName(URI url) { + String fsname = url.getScheme(); + if ("dfs".equals(fsname)) { + String host = url.getHost(); + int port = url.getPort(); + return (port == (-1)) ? host : (host + ":" + port); + } else { + return null; + } + } + + private static FileSystem getFileSystem(URI cache, Configuration conf) + throws IOException { + String fileSysName = getFileSysName(cache); + if (fileSysName != null) + return FileSystem.getNamed(fileSysName, conf); + else + return FileSystem.get(conf); + } + + /** + * Set the configuration with the given set of archives + * @param archives The list of archives that need to be localized + * @param conf Configuration which will be changed + */ + public static void setCacheArchives(URI[] archives, Configuration conf) { + String sarchives = StringUtils.uriToString(archives); + conf.set("mapred.cache.archives", sarchives); + } + + /** + * Set the configuration with the given set of files + * @param files The list of files that need to be localized + * @param conf Configuration which will be changed + */ + public static void setCacheFiles(URI[] files, Configuration conf) { + String sfiles = StringUtils.uriToString(files); + conf.set("mapred.cache.files", sfiles); + } + + /** + * Get cache archives set in the Configuration + * @param conf The configuration which contains the archives + * @return A URI array of the caches set in the Configuration + * @throws IOException + */ + public static URI[] getCacheArchives(Configuration conf) throws IOException { + return StringUtils.stringToURI(conf.getStrings("mapred.cache.archives")); + } + + /** + * Get cache files set in the Configuration + * @param conf The configuration which contains the files + * @return A URI array of the files set in the Configuration + * @throws IOException + */ + + public static URI[] getCacheFiles(Configuration conf) throws IOException { + return StringUtils.stringToURI(conf.getStrings("mapred.cache.files")); + } + + /** + * Return the path array of the localized caches + * @param conf Configuration that contains the localized archives + * @return A path array of localized caches + * @throws IOException + */ + public static Path[] getLocalCacheArchives(Configuration conf) + throws IOException { + return StringUtils.stringToPath(conf + .getStrings("mapred.cache.localArchives")); + } + + /** + * Return the path array of the localized files + * @param conf Configuration that contains the localized files + * @return A path array of localized files + * @throws IOException + */ + public static Path[] getLocalCacheFiles(Configuration conf) + throws IOException { + return StringUtils.stringToPath(conf.getStrings("mapred.cache.localFiles")); + } + + /** + * Get the md5 checksums of the archives + * @param conf The configuration which stored the md5's + * @return a string array of md5 checksums + * @throws IOException + */ + public static String[] getArchiveMd5(Configuration conf) throws IOException { + return conf.getStrings("mapred.cache.archivemd5"); + } + + + /** + * Get the md5 checksums of the files + * @param conf The configuration which stored the md5's + * @return a string array of md5 checksums + * @throws IOException + */ + public static String[] getFileMd5(Configuration conf) throws IOException { + return conf.getStrings("mapred.cache.filemd5"); + } + + /** + * This is to check the md5 of the archives to be localized + * @param conf Configuration which stores the md5's + * @param md5 comma seperated list of md5 checksums of the .crc's of archives. + * The order should be the same as the order in which the archives are added + */ + public static void setArchiveMd5(Configuration conf, String md5) { + conf.set("mapred.cache.archivemd5", md5); + } + + /** + * This is to check the md5 of the files to be localized + * @param conf Configuration which stores the md5's + * @param md5 comma seperated list of md5 checksums of the .crc's of files. + * The order should be the same as the order in which the files are added + */ + public static void setFileMd5(Configuration conf, String md5) { + conf.set("mapred.cache.filemd5", md5); + } + + /** + * Set the conf to contain the location for localized archives + * @param conf The conf to modify to contain the localized caches + * @param str a comma seperated list of local archives + */ + public static void setLocalArchives(Configuration conf, String str) { + conf.set("mapred.cache.localArchives", str); + } + + /** + * Set the conf to contain the location for localized files + * @param conf The conf to modify to contain the localized caches + * @param str a comma seperated list of local files + */ + public static void setLocalFiles(Configuration conf, String str) { + conf.set("mapred.cache.localFiles", str); + } + + /** + * Add a archives to be localized to the conf + * @param uri The uri of the cache to be localized + * @param conf Configuration to add the cache to + */ + public static void addCacheArchive(URI uri, Configuration conf) { + String archives = conf.get("mapred.cache.archives"); + conf.set("mapred.cache.archives", archives == null ? uri.toString() + : archives + "," + uri.toString()); + } + + /** + * Add a file to be localized to the conf + * @param uri The uri of the cache to be localized + * @param conf Configuration to add the cache to + */ + public static void addCacheFile(URI uri, Configuration conf) { + String files = conf.get("mapred.cache.files"); + conf.set("mapred.cache.files", files == null ? uri.toString() : files + "," + + uri.toString()); + } + + private static class CacheStatus { + // false, not loaded yet, true is loaded + public boolean currentStatus; + + // the local load path of this cache + public Path localLoadPath; + + // number of instances using this cache + public int refcount; + + // The md5 checksum of the crc file of this cache + public byte[] md5; + } + +} \ No newline at end of file Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java?view=diff&rev=443497&r1=443496&r2=443497 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java Thu Sep 14 15:13:43 2006 @@ -17,6 +17,9 @@ package org.apache.hadoop.fs; import java.io.*; +import java.util.Enumeration; +import java.util.zip.ZipEntry; +import java.util.zip.ZipFile; import org.apache.hadoop.conf.Configuration; @@ -230,5 +233,68 @@ } return dst; } - + + /** + * Takes an input dir and returns the du on that local directory. Very basic + * implementation. + * + * @param dir + * The input dir to get the disk space of this local dir + * @return The total disk space of the input local directory + */ + public static long getDU(File dir) { + long size = 0; + if (!dir.exists()) + return 0; + if (!dir.isDirectory()) { + return dir.length(); + } else { + size = dir.length(); + File[] allFiles = dir.listFiles(); + for (int i = 0; i < allFiles.length; i++) { + size = size + getDU(allFiles[i]); + } + return size; + } + } + + /** + * Given a File input it will unzip the file in a the unzip directory + * passed as the second parameter + * @param inFile The zip file as input + * @param unzipDir The unzip directory where to unzip the zip file. + * @throws IOException + */ + public static void unZip(File inFile, File unzipDir) throws IOException { + Enumeration entries; + ZipFile zipFile = new ZipFile(inFile); + ; + try { + entries = zipFile.entries(); + while (entries.hasMoreElements()) { + ZipEntry entry = (ZipEntry) entries.nextElement(); + if (!entry.isDirectory()) { + InputStream in = zipFile.getInputStream(entry); + try { + File file = new File(unzipDir, entry.getName()); + file.getParentFile().mkdirs(); + OutputStream out = new FileOutputStream(file); + try { + byte[] buffer = new byte[8192]; + int i; + while ((i = in.read(buffer)) != -1) { + out.write(buffer, 0, i); + } + } finally { + out.close(); + } + } finally { + in.close(); + } + } + } + } finally { + zipFile.close(); + } + } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?view=diff&rev=443497&r1=443496&r2=443497 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Thu Sep 14 15:13:43 2006 @@ -21,7 +21,7 @@ import org.apache.hadoop.ipc.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.util.*; - +import org.apache.hadoop.filecache.*; import java.io.*; import java.net.*; import java.util.*; @@ -227,7 +227,8 @@ JobConf job = new JobConf(jobFile); return submitJob(job); } - + + /** * Submit a job to the MR system */ @@ -244,11 +245,39 @@ Path submitJobDir = new Path(job.getSystemDir(), "submit_" + Integer.toString(Math.abs(r.nextInt()), 36)); Path submitJobFile = new Path(submitJobDir, "job.xml"); Path submitJarFile = new Path(submitJobDir, "job.jar"); - - String originalJarPath = job.getJar(); - FileSystem fs = getFs(); - + // try getting the md5 of the archives + URI[] tarchives = DistributedCache.getCacheArchives(job); + URI[] tfiles = DistributedCache.getCacheFiles(job); + if ((tarchives != null) || (tfiles != null)) { + // prepare these archives for md5 checksums + if (tarchives != null) { + String md5Archives = StringUtils.byteToHexString(DistributedCache + .createMD5(tarchives[0], job)); + for (int i = 1; i < tarchives.length; i++) { + md5Archives = md5Archives + + "," + + StringUtils.byteToHexString(DistributedCache + .createMD5(tarchives[i], job)); + } + DistributedCache.setArchiveMd5(job, md5Archives); + //job.set("mapred.cache.archivemd5", md5Archives); + } + if (tfiles != null) { + String md5Files = StringUtils.byteToHexString(DistributedCache + .createMD5(tfiles[0], job)); + for (int i = 1; i < tfiles.length; i++) { + md5Files = md5Files + + "," + + StringUtils.byteToHexString(DistributedCache + .createMD5(tfiles[i], job)); + } + DistributedCache.setFileMd5(job, md5Files); + //"mapred.cache.filemd5", md5Files); + } + } + + String originalJarPath = job.getJar(); short replication = (short)job.getInt("mapred.submit.replication", 10); if (originalJarPath != null) { // copy jar to JobTracker's fs Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?view=diff&rev=443497&r1=443496&r2=443497 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Thu Sep 14 15:13:43 2006 @@ -171,6 +171,7 @@ String dirs = get("mapred.input.dir"); set("mapred.input.dir", dirs == null ? dir.toString() : dirs + "," + dir); } + public Path[] getInputPaths() { String dirs = get("mapred.input.dir", ""); ArrayList list = Collections.list(new StringTokenizer(dirs, ",")); @@ -197,8 +198,10 @@ set("user.name", user); } + + /** - * Set whether the framework shoul keep the intermediate files for + * Set whether the framework should keep the intermediate files for * failed tasks. */ public void setKeepFailedTaskFiles(boolean keep) { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=443497&r1=443496&r2=443497 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Thu Sep 14 15:13:43 2006 @@ -20,11 +20,12 @@ import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.util.*; - +import org.apache.hadoop.filecache.*; import java.io.*; import java.util.jar.*; import java.util.Vector; import java.util.Enumeration; +import java.net.URI; /** Base class that runs a task in a separate process. Tasks are run in a * separate process in order to isolate the map/reduce system code from bugs in @@ -61,25 +62,71 @@ */ public void close() throws IOException {} + private String stringifyPathArray(Path[] p){ + if (p == null){ + return null; + } + String str = p[0].toString(); + for (int i = 1; i < p.length; i++){ + str = str + "," + p[i].toString(); + } + return str; + } + public final void run() { try { - + + //before preparing the job localize + //all the archives + + URI[] archives = DistributedCache.getCacheArchives(conf); + URI[] files = DistributedCache.getCacheFiles(conf); + if ((archives != null) || (files != null)) { + if (archives != null) { + String[] md5 = DistributedCache.getArchiveMd5(conf); + Path[] p = new Path[archives.length]; + for (int i = 0; i < archives.length;i++){ + p[i] = DistributedCache.getLocalCache(archives[i], conf, conf.getLocalPath(TaskTracker.getCacheSubdir()), true, md5[i]); + } + DistributedCache.setLocalArchives(conf, stringifyPathArray(p)); + } + if ((files != null)) { + String[] md5 = DistributedCache.getFileMd5(conf); + Path[] p = new Path[files.length]; + for (int i = 0; i < files.length;i++){ + p[i] = DistributedCache.getLocalCache(files[i], conf, conf.getLocalPath(TaskTracker + .getCacheSubdir()), false, md5[i]); + } + DistributedCache.setLocalFiles(conf, stringifyPathArray(p)); + } + + // sets the paths to local archives and paths + Path localTaskFile = new Path(t.getJobFile()); + FileSystem localFs = FileSystem.getNamed("local", conf); + localFs.delete(localTaskFile); + OutputStream out = localFs.create(localTaskFile); + try { + conf.write(out); + } finally { + out.close(); + } + } + if (! prepare()) { return; } String sep = System.getProperty("path.separator"); - File workDir = new File(new File(t.getJobFile()).getParent(), "work"); - workDir.mkdirs(); - StringBuffer classPath = new StringBuffer(); // start with same classpath as parent process classPath.append(System.getProperty("java.class.path")); classPath.append(sep); - + File workDir = new File(new File(t.getJobFile()).getParentFile().getParent(), "work"); + workDir.mkdirs(); + String jar = conf.getJar(); - if (jar != null) { // if jar exists, it into workDir - RunJar.unJar(new File(jar), workDir); + if (jar != null) { + // if jar exists, it into workDir File[] libs = new File(workDir, "lib").listFiles(); if (libs != null) { for (int i = 0; i < libs.length; i++) { @@ -160,10 +207,27 @@ LOG.warn(t.getTaskId()+" Reporting Diagnostics", e); } } finally { + try{ + URI[] archives = DistributedCache.getCacheArchives(conf); + URI[] files = DistributedCache.getCacheFiles(conf); + if (archives != null){ + for (int i = 0; i < archives.length; i++){ + DistributedCache.releaseCache(archives[i], conf); + } + } + if (files != null){ + for(int i = 0; i < files.length; i++){ + DistributedCache.releaseCache(files[i], conf); + } + } + }catch(IOException ie){ + LOG.warn("Error releasing caches : Cache files might not have been cleaned up"); + } tracker.reportTaskFinished(t.getTaskId()); } } + /** * Handle deprecated mapred.child.heap.size. * If present, interpolate into mapred.child.java.opts value with @@ -238,6 +302,7 @@ logStream(process.getInputStream()); // normally empty int exit_code = process.waitFor(); + if (!killed && exit_code != 0) { throw new IOException("Task process exit with nonzero status of " + exit_code + "."); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=443497&r1=443496&r2=443497 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu Sep 14 15:13:43 2006 @@ -71,6 +71,7 @@ * Map from taskId -> TaskInProgress. */ TreeMap runningTasks = null; + Map runningJobs = null; int mapTotal = 0; int reduceTotal = 0; boolean justStarted = true; @@ -89,11 +90,11 @@ static Random r = new Random(); FileSystem fs = null; - static final String SUBDIR = "taskTracker"; - + private static final String SUBDIR = "taskTracker"; + private static final String CACHEDIR = "archive"; + private static final String JOBCACHE = "jobcache"; private JobConf fConf; private MapOutputFile mapOutputFile; - private int maxCurrentTasks; private int failures; private int finishedCount[] = new int[1]; @@ -145,6 +146,14 @@ taskCleanupThread.start(); } + static String getCacheSubdir() { + return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR; + } + + static String getJobCacheSubdir() { + return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE; + } + public long getProtocolVersion(String protocol, long clientVersion) { return TaskUmbilicalProtocol.versionID; } @@ -167,6 +176,7 @@ // Clear out state tables this.tasks = new TreeMap(); this.runningTasks = new TreeMap(); + this.runningJobs = new TreeMap(); this.mapTotal = 0; this.reduceTotal = 0; this.acceptNewTasks = true; @@ -207,8 +217,84 @@ this.running = true; } + + // intialize the job directory + private void localizeJob(TaskInProgress tip) throws IOException { + Path localJarFile = null; + Task t = tip.getTask(); + Path localJobFile = new Path(fConf.getLocalPath(getJobCacheSubdir()), (t + .getJobId() + + Path.SEPARATOR + "job.xml")); + RunningJob rjob = null; + synchronized (runningJobs) { + if (!runningJobs.containsKey(t.getJobId())) { + rjob = new RunningJob(); + rjob.localized = false; + rjob.tasks = new ArrayList(); + rjob.jobFile = localJobFile; + rjob.tasks.add(tip); + runningJobs.put(t.getJobId(), rjob); + } else { + rjob = (RunningJob) runningJobs.get(t.getJobId()); + // keep this for later use when we just get a jobid to delete + // the data for + rjob.tasks.add(tip); + } + } + synchronized (rjob) { + if (!rjob.localized) { + localJarFile = new Path(fConf.getLocalPath(getJobCacheSubdir()), (t + .getJobId()) + + Path.SEPARATOR + "job.jar"); + + String jobFile = t.getJobFile(); + fs.copyToLocalFile(new Path(jobFile), localJobFile); + JobConf localJobConf = new JobConf(localJobFile); + String jarFile = localJobConf.getJar(); + if (jarFile != null) { + fs.copyToLocalFile(new Path(jarFile), localJarFile); + localJobConf.setJar(localJarFile.toString()); + FileSystem localFs = FileSystem.getNamed("local", fConf); + OutputStream out = localFs.create(localJobFile); + try { + localJobConf.write(out); + } finally { + out.close(); + } - public synchronized void shutdown() throws IOException { + // also unjar the job.jar files in workdir + File workDir = new File( + new File(localJobFile.toString()).getParent(), + "work"); + workDir.mkdirs(); + RunJar.unJar(new File(localJarFile.toString()), workDir); + } + rjob.localized = true; + } + } + launchTaskForJob(tip, new JobConf(rjob.jobFile)); + } + + private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{ + synchronized (tip) { + try { + tip.setJobConf(jobConf); + tip.launchTask(); + } catch (Throwable ie) { + tip.runstate = TaskStatus.FAILED; + try { + tip.cleanup(); + } catch (Throwable ie2) { + // Ignore it, we are just trying to cleanup. + } + String error = StringUtils.stringifyException(ie); + tip.reportDiagnosticInfo(error); + LOG.info(error); + } + } + } + + public synchronized void shutdown() throws IOException { shuttingDown = true; close(); if (this.server != null) { @@ -312,6 +398,12 @@ } catch (InterruptedException ie) {} } } + /**Return the DFS filesystem + * @return + */ + public FileSystem getFileSystem(){ + return fs; + } /** * Main service loop. Will stay in this loop forever. @@ -448,6 +540,10 @@ synchronized (this) { for (int i = 0; i < toCloseIds.length; i++) { Object tip = tasks.get(toCloseIds[i]); + synchronized(runningJobs){ + runningJobs.remove(((TaskInProgress) + tasks.get(toCloseIds[i])).getTask().getJobId()); + } if (tip != null) { tasksToCleanup.put(tip); } else { @@ -551,8 +647,8 @@ return true; } - - /** + + /** * Start a new task. * All exceptions are handled locally, so that we don't mess up the * task tracker. @@ -569,20 +665,10 @@ reduceTotal++; } } - synchronized (tip) { - try { - tip.launchTask(); - } catch (Throwable ie) { - tip.runstate = TaskStatus.FAILED; - try { - tip.cleanup(); - } catch (Throwable ie2) { - // Ignore it, we are just trying to cleanup. - } - String error = StringUtils.stringifyException(ie); - tip.reportDiagnosticInfo(error); - LOG.info(error); - } + try{ + localizeJob(tip); + }catch(IOException ie){ + LOG.warn("Error initializing Job " + tip.getTask().getJobId()); } } @@ -691,6 +777,7 @@ private JobConf localJobConf; private boolean keepFailedTaskFiles; private boolean alwaysKeepTaskFiles; + private boolean keepJobFiles; /** */ @@ -702,60 +789,52 @@ this.lastProgressReport = System.currentTimeMillis(); this.defaultJobConf = conf; localJobConf = null; + keepJobFiles = false; } - - /** - * Some fields in the Task object need to be made machine-specific. - * So here, edit the Task's fields appropriately. - */ - private void localizeTask(Task t) throws IOException { - this.defaultJobConf.deleteLocalFiles(SUBDIR + "/" + - task.getTaskId()); - Path localJobFile = - this.defaultJobConf.getLocalPath(SUBDIR+"/"+t.getTaskId()+"/"+"job.xml"); - Path localJarFile = - this.defaultJobConf.getLocalPath(SUBDIR+"/"+t.getTaskId()+"/"+"job.jar"); - - String jobFile = t.getJobFile(); - fs.copyToLocalFile(new Path(jobFile), localJobFile); - t.setJobFile(localJobFile.toString()); - - localJobConf = new JobConf(localJobFile); - localJobConf.set("mapred.local.dir", - this.defaultJobConf.get("mapred.local.dir")); - String jarFile = localJobConf.getJar(); - if (jarFile != null) { - fs.copyToLocalFile(new Path(jarFile), localJarFile); - localJobConf.setJar(localJarFile.toString()); - } - task.localizeConfiguration(localJobConf); - - FileSystem localFs = FileSystem.getNamed("local", fConf); - OutputStream out = localFs.create(localJobFile); - try { - localJobConf.write(out); - } finally { - out.close(); - } - // set the task's configuration to the local job conf - // rather than the default. - t.setConf(localJobConf); - keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles(); + + private void localizeTask(Task task) throws IOException{ + Path localTaskDir = + new Path(this.defaultJobConf.getLocalPath(SUBDIR+ Path.SEPARATOR + + JOBCACHE + Path.SEPARATOR + + task.getJobId()), task.getTaskId()); + FileSystem localFs = FileSystem.getNamed("local", fConf); + localFs.mkdirs(localTaskDir); + Path localTaskFile = new Path(localTaskDir, "job.xml"); + task.setJobFile(localTaskFile.toString()); + localJobConf.set("mapred.local.dir", + fConf.get("mapred.local.dir")); + + localJobConf.set("mapred.task.id", task.getTaskId()); + keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles(); + task.localizeConfiguration(localJobConf); + OutputStream out = localFs.create(localTaskFile); + try { + localJobConf.write(out); + } finally { + out.close(); + } + task.setConf(localJobConf); String keepPattern = localJobConf.getKeepTaskFilesPattern(); if (keepPattern != null) { - alwaysKeepTaskFiles = + keepJobFiles = true; + alwaysKeepTaskFiles = Pattern.matches(keepPattern, task.getTaskId()); } else { alwaysKeepTaskFiles = false; } } - + /** */ public Task getTask() { return task; } + public void setJobConf(JobConf lconf){ + this.localJobConf = lconf; + keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles(); + } + /** */ public synchronized TaskStatus createStatus() { @@ -876,11 +955,18 @@ * finished. If the task is still running, kill it (and clean up */ public synchronized void jobHasFinished() throws IOException { + if (getRunState() == TaskStatus.RUNNING) { killAndCleanup(false); } else { cleanup(); } + if (keepJobFiles) + return; + // delete the job diretory for this task + // since the job is done/failed + this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + + JOBCACHE + Path.SEPARATOR + task.getJobId()); } /** @@ -934,10 +1020,13 @@ } } } - this.defaultJobConf.deleteLocalFiles(SUBDIR + "/" + taskId); - } + this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + + JOBCACHE + Path.SEPARATOR + task.getJobId() + Path.SEPARATOR + + taskId); + } } + // /////////////////////////////////////////////////////////////// // TaskUmbilicalProtocol ///////////////////////////////////////////////////////////////// @@ -1034,6 +1123,16 @@ } else { LOG.warn("Unknown child with bad map output: "+taskid+". Ignored."); } + } + + /** + * The datastructure for initializing a job + */ + static class RunningJob{ + Path jobFile; + // keep this for later use + ArrayList tasks; + boolean localized; } /** Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java?view=diff&rev=443497&r1=443496&r2=443497 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java Thu Sep 14 15:13:43 2006 @@ -18,7 +18,10 @@ import java.io.PrintWriter; import java.io.StringWriter; +import java.net.URI; +import java.net.URISyntaxException; import java.text.DecimalFormat; +import org.apache.hadoop.fs.*; /** * General string utils @@ -104,5 +107,82 @@ sbuf.append(strs[idx]); } return sbuf.toString(); + } + + /** + * Given an array of bytes it will convert the bytes to a hex string + * representation of the bytes + * @param bytes + * @return hex string representation of the byte array + */ + public static String byteToHexString(byte bytes[]) { + StringBuffer retString = new StringBuffer(); + for (int i = 0; i < bytes.length; ++i) { + retString.append(Integer.toHexString(0x0100 + (bytes[i] & 0x00FF)) + .substring(1)); + } + return retString.toString(); + } + + /** + * Given a hexstring this will return the byte array corresponding to the + * string + * @param hex the hex String array + * @return a byte array that is a hex string representation of the given + * string. The size of the byte array is therefore hex.length/2 + */ + public static byte[] hexStringToByte(String hex) { + byte[] bts = new byte[hex.length() / 2]; + for (int i = 0; i < bts.length; i++) { + bts[i] = (byte) Integer.parseInt(hex.substring(2 * i, 2 * i + 2), 16); + } + return bts; + } + /** + * + * @param uris + * @return + */ + public static String uriToString(URI[] uris){ + String ret = null; + ret = uris[0].toString(); + for(int i = 1; i < uris.length;i++){ + ret = ret + "," + uris[i].toString(); + } + return ret; + } + + /** + * + * @param str + * @return + */ + public static URI[] stringToURI(String[] str){ + if (str == null) + return null; + URI[] uris = new URI[str.length]; + for (int i = 0; i < str.length;i++){ + try{ + uris[i] = new URI(str[i]); + }catch(URISyntaxException ur){ + System.out.println("Exception in specified URI's " + StringUtils.stringifyException(ur)); + //making sure its asssigned to null in case of an error + uris[i] = null; + } + } + return uris; + } + + /** + * + * @param str + * @return + */ + public static Path[] stringToPath(String[] str){ + Path[] p = new Path[str.length]; + for (int i = 0; i < str.length;i++){ + p[i] = new Path(str[i]); + } + return p; } } Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java?view=auto&rev=443497 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java Thu Sep 14 15:13:43 2006 @@ -0,0 +1,205 @@ +/** + * Copyright 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.*; +import java.util.*; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.*; +import org.apache.hadoop.mapred.MapReduceBase; +import java.io.*; +import org.apache.hadoop.filecache.*; +import java.net.URI; +import java.net.URISyntaxException; + +public class MRCaching { + static String testStr = "This is a test file " + "used for testing caching " + + "jars, zip and normal files."; + + /** + * Using the wordcount example and adding caching to it. The cache + * archives/files are set and then are checked in the map if they have been + * localized or not. + */ + public static class MapClass extends MapReduceBase implements Mapper { + JobConf conf; + + private final static IntWritable one = new IntWritable(1); + + private Text word = new Text(); + + public void configure(JobConf jconf) { + conf = jconf; + try { + Path[] localArchives = DistributedCache.getLocalCacheArchives(conf); + Path[] localFiles = DistributedCache.getLocalCacheFiles(conf); + FileSystem fs = FileSystem.get(conf); + // read the cached files (unzipped, unjarred and text) + // and put it into a single file /tmp/test.txt + Path file = new Path("/tmp"); + fs.mkdirs(file); + Path fileOut = new Path(file, "test.txt"); + fs.delete(file); + DataOutputStream out = fs.create(fileOut); + + for (int i = 0; i < localArchives.length; i++) { + // read out the files from these archives + File f = new File(localArchives[i].toString()); + File txt = new File(f, "test.txt"); + FileInputStream fin = new FileInputStream(txt); + DataInputStream din = new DataInputStream(fin); + String str = din.readLine(); + din.close(); + out.writeBytes(str); + out.writeBytes("\n"); + } + for (int i = 0; i < localFiles.length; i++) { + // read out the files from these archives + File txt = new File(localFiles[i].toString()); + FileInputStream fin = new FileInputStream(txt); + DataInputStream din = new DataInputStream(fin); + String str = din.readLine(); + out.writeBytes(str); + out.writeBytes("\n"); + } + out.close(); + } catch (IOException ie) { + System.out.println(StringUtils.stringifyException(ie)); + } + } + + public void map(WritableComparable key, Writable value, + OutputCollector output, Reporter reporter) throws IOException { + String line = ((Text) value).toString(); + StringTokenizer itr = new StringTokenizer(line); + while (itr.hasMoreTokens()) { + word.set(itr.nextToken()); + output.collect(word, one); + } + + } + } + + /** + * A reducer class that just emits the sum of the input values. + */ + public static class ReduceClass extends MapReduceBase implements Reducer { + + public void reduce(WritableComparable key, Iterator values, + OutputCollector output, Reporter reporter) throws IOException { + int sum = 0; + while (values.hasNext()) { + sum += ((IntWritable) values.next()).get(); + } + output.collect(key, new IntWritable(sum)); + } + } + + public static boolean launchMRCache(String jobTracker, String indir, + String outdir, String fileSys, JobConf conf, String input) + throws IOException { + final Path inDir = new Path(indir); + final Path outDir = new Path(outdir); + FileSystem fs = FileSystem.getNamed(fileSys, conf); + fs.delete(outDir); + fs.mkdirs(inDir); + + { + DataOutputStream file = fs.create(new Path(inDir, "part-0")); + file.writeBytes(input); + file.close(); + } + conf.set("fs.default.name", fileSys); + conf.set("mapred.job.tracker", jobTracker); + conf.setJobName("cachetest"); + + // the keys are words (strings) + conf.setOutputKeyClass(Text.class); + // the values are counts (ints) + conf.setOutputValueClass(IntWritable.class); + + conf.setMapperClass(MRCaching.MapClass.class); + conf.setCombinerClass(MRCaching.ReduceClass.class); + conf.setReducerClass(MRCaching.ReduceClass.class); + conf.setInputPath(inDir); + conf.setOutputPath(outDir); + conf.setNumMapTasks(1); + conf.setNumReduceTasks(1); + conf.setSpeculativeExecution(false); + Path localPath = new Path("build/test/cache"); + Path txtPath = new Path(localPath, new Path("test.txt")); + Path jarPath = new Path(localPath, new Path("test.jar")); + Path zipPath = new Path(localPath, new Path("test.zip")); + Path cacheTest = new Path("/tmp/cachedir"); + fs.delete(cacheTest); + fs.mkdirs(cacheTest); + fs.copyFromLocalFile(txtPath, cacheTest); + fs.copyFromLocalFile(jarPath, cacheTest); + fs.copyFromLocalFile(zipPath, cacheTest); + // setting the cached archives to zip, jar and simple text files + String archive1 = "dfs://" + fileSys + "/tmp/cachedir/test.jar"; + String archive2 = "dfs://" + fileSys + "/tmp/cachedir/test.zip"; + String file1 = "dfs://" + fileSys + "/tmp/cachedir/test.txt"; + URI uri1 = null; + URI uri2 = null; + URI uri3 = null; + try{ + uri1 = new URI(archive1); + uri2 = new URI(archive2); + uri3 = new URI(file1); + } catch(URISyntaxException ur){ + } + DistributedCache.addCacheArchive(uri1, conf); + DistributedCache.addCacheArchive(uri2, conf); + DistributedCache.addCacheFile(uri3, conf); + JobClient.runJob(conf); + int count = 0; + // after the job ran check to see if the the input from the localized cache + // match the real string. check if there are 3 instances or not. + Path result = new Path("/tmp/test.txt"); + { + BufferedReader file = new BufferedReader(new InputStreamReader(fs + .open(result))); + String line = file.readLine(); + while (line != null) { + if (!testStr.equals(line)) + return false; + count++; + line = file.readLine(); + + } + file.close(); + } + if (count != 3) + return false; + + return true; + + } +} Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java?view=auto&rev=443497 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java Thu Sep 14 15:13:43 2006 @@ -0,0 +1,72 @@ +/** + * Copyright 2005 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.*; +import java.util.*; +import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.dfs.MiniDFSCluster; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.*; + +/** + * A JUnit test to test caching with DFS + * + * @author Mahadev Konar + */ +public class TestMiniMRDFSCaching extends TestCase { + + public void testWithDFS() throws IOException { + MiniMRCluster mr = null; + MiniDFSCluster dfs = null; + String namenode = null; + FileSystem fileSys = null; + try { + Configuration conf = new Configuration(); + dfs = new MiniDFSCluster(65314, conf, true); + fileSys = dfs.getFileSystem(); + namenode = fileSys.getName(); + mr = new MiniMRCluster(50050, 50060, 2, namenode, true); + JobConf jconf = new JobConf(); + // run the wordcount example with caching + boolean ret = MRCaching.launchMRCache("localhost:50050", + "/testing/wc/input", + "/testing/wc/output", namenode, + jconf, + "The quick brown fox\nhas many silly\n" + + "red fox sox\n"); + assertTrue("Archives not matching", ret); + } finally { + if (fileSys != null) { + fileSys.close(); + } + if (dfs != null) { + dfs.shutdown(); + } + if (mr != null) { + mr.shutdown(); + } + } + } + + public static void main(String[] argv) throws Exception { + TestMiniMRDFSCaching td = new TestMiniMRDFSCaching(); + td.testWithDFS(); + } +} Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?view=diff&rev=443497&r1=443496&r2=443497 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Thu Sep 14 15:13:43 2006 @@ -36,6 +36,14 @@ double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, "localhost:60030", "local"); double error = Math.abs(Math.PI - estimate); assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01)); + JobConf jconf = new JobConf(); + // run the wordcount example with caching + boolean ret = MRCaching.launchMRCache("localhost:60030", "/tmp/wc/input", + "/tmp/wc/output", "local", jconf, + "The quick brown fox\nhas many silly\n" + + "red fox sox\n"); + // assert the number of lines read during caching + assertTrue("Failed test archives not matching", ret); } finally { if (mr != null) { mr.shutdown(); } } Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?view=diff&rev=443497&r1=443496&r2=443497 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Thu Sep 14 15:13:43 2006 @@ -95,12 +95,14 @@ * @param taskDirs the task ids that should be present */ private static void checkTaskDirectories(MiniMRCluster mr, + String[] jobIds, String[] taskDirs) { mr.waitUntilIdle(); int trackers = mr.getNumTaskTrackers(); List neededDirs = new ArrayList(Arrays.asList(taskDirs)); boolean[] found = new boolean[taskDirs.length]; for(int i=0; i < trackers; ++i) { + int numNotDel = 0; File localDir = new File(mr.getTaskTrackerLocalDir(i)); File trackerDir = new File(localDir, "taskTracker"); assertTrue("local dir " + localDir + " does not exist.", @@ -113,7 +115,7 @@ System.out.println("Local " + localDir + ": " + contents[j]); } for(int j=0; j < trackerContents.length; ++j) { - System.out.println("Local " + trackerDir + ": " + trackerContents[j]); + System.out.println("Local jobcache " + trackerDir + ": " + trackerContents[j]); } for(int fileIdx = 0; fileIdx < contents.length; ++fileIdx) { String name = contents[fileIdx]; @@ -123,13 +125,11 @@ localDir, idx != -1); assertTrue("Matching output directory not found " + name + " in " + trackerDir, - new File(trackerDir, name).isDirectory()); + new File(new File(new File(trackerDir, "jobcache"), jobIds[idx]), name).isDirectory()); found[idx] = true; + numNotDel++; } } - assertTrue("The local directory had " + contents.length + - " and task tracker directory had " + trackerContents.length + - " items.", contents.length == trackerContents.length + 1); } for(int i=0; i< found.length; i++) { assertTrue("Directory " + taskDirs[i] + " not found", found[i]); @@ -155,7 +155,7 @@ jobTrackerName, namenode); double error = Math.abs(Math.PI - estimate); assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01)); - checkTaskDirectories(mr, new String[]{}); + checkTaskDirectories(mr, new String[]{}, new String[]{}); // Run a word count example JobConf jobConf = new JobConf(); @@ -168,7 +168,7 @@ 3, 1); assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" + "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result); - checkTaskDirectories(mr, new String[]{"task_0002_m_000001_0"}); + checkTaskDirectories(mr, new String[]{"job_0002"}, new String[]{"task_0002_m_000001_0"}); } finally { if (fileSys != null) { fileSys.close(); } Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.jar URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.jar?view=auto&rev=443497 ============================================================================== Binary file - no diff available. Propchange: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.jar ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.txt?view=auto&rev=443497 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.txt (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.txt Thu Sep 14 15:13:43 2006 @@ -0,0 +1 @@ +This is a test file used for testing caching jars, zip and normal files. Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.zip URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.zip?view=auto&rev=443497 ============================================================================== Binary file - no diff available. Propchange: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.zip ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream