Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 55186 invoked from network); 31 Oct 2006 20:14:01 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 31 Oct 2006 20:14:01 -0000 Received: (qmail 37609 invoked by uid 500); 31 Oct 2006 20:14:12 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 37596 invoked by uid 500); 31 Oct 2006 20:14:12 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 37587 invoked by uid 99); 31 Oct 2006 20:14:12 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 31 Oct 2006 12:14:12 -0800 X-ASF-Spam-Status: No, hits=0.6 required=10.0 tests=NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 31 Oct 2006 12:13:56 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id 73E241A9846; Tue, 31 Oct 2006 12:13:33 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r469635 - in /lucene/hadoop/trunk: ./ src/contrib/ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/java/org/apache/hadoop/conf/ src/java/org/apache/hadoop/filecache... Date: Tue, 31 Oct 2006 20:13:33 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20061031201333.73E241A9846@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cutting Date: Tue Oct 31 12:13:31 2006 New Revision: 469635 URL: http://svn.apache.org/viewvc?view=rev&rev=469635 Log: HADOOP-576. Enable contrib/streaming to use the file cache. Contributed by Mahadev. Added: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/build.xml lucene/hadoop/trunk/src/contrib/build-contrib.xml lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=469635&r1=469634&r2=469635 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Tue Oct 31 12:13:31 2006 @@ -87,6 +87,10 @@ reduce tasks which use MapFile to still report progress while writing blocks to the filesystem. (cutting) +24. HADOOP-576. Enable contrib/streaming to use the file cache. Also + extend the cache to permit symbolic links to cached items, rather + than local file copies. (Mahadev Konar via cutting) + Release 0.7.2 - 2006-10-18 Modified: lucene/hadoop/trunk/build.xml URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/build.xml?view=diff&rev=469635&r1=469634&r2=469635 ============================================================================== --- lucene/hadoop/trunk/build.xml (original) +++ lucene/hadoop/trunk/build.xml Tue Oct 31 12:13:31 2006 @@ -350,8 +350,8 @@ - - + + Modified: lucene/hadoop/trunk/src/contrib/build-contrib.xml URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/build-contrib.xml?view=diff&rev=469635&r1=469634&r2=469635 ============================================================================== --- lucene/hadoop/trunk/src/contrib/build-contrib.xml (original) +++ lucene/hadoop/trunk/src/contrib/build-contrib.xml Tue Oct 31 12:13:31 2006 @@ -24,9 +24,10 @@ + - + @@ -51,6 +52,7 @@ + @@ -69,7 +71,8 @@ - + + @@ -131,8 +134,10 @@ + + @@ -145,7 +150,7 @@ - + Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java?view=diff&rev=469635&r1=469634&r2=469635 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java Tue Oct 31 12:13:31 2006 @@ -22,8 +22,12 @@ import java.net.InetAddress; import java.util.*; -/* - * If we move to Java 1.5, we can get rid of this class and just use System.getenv +/** + * This is a class used to get the current environment + * on the host machines running the map/reduce. This class + * assumes that setting the environment in streaming is + * allowed on windows/ix/linuz/freebsd/sunos/solaris/hp-ux + * @author michel */ public class Environment extends Properties { Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java?view=diff&rev=469635&r1=469634&r2=469635 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java Tue Oct 31 12:13:31 2006 @@ -21,6 +21,8 @@ import java.io.IOException; /** The main entrypoint. Usually invoked with the script bin/hadoopStreaming + * or bin/hadoop har hadoop-streaming.jar args. + * It passes all the args to StreamJob which handles all the arguments. */ public class HadoopStreaming { Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java?view=diff&rev=469635&r1=469634&r2=469635 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java Tue Oct 31 12:13:31 2006 @@ -23,6 +23,16 @@ import java.util.jar.*; import java.util.zip.ZipException; +/** + * This class is the main class for generating job.jar + * for Hadoop Streaming jobs. It includes the files specified + * with the -file option and includes them in the jar. Also, + * hadoop-streaming is a user level appplication, so all the classes + * with hadoop-streaming that are needed in the job are also included + * in the job.jar. + * @author michel + * + */ public class JarBuilder { public JarBuilder() { Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java?view=diff&rev=469635&r1=469634&r2=469635 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java Tue Oct 31 12:13:31 2006 @@ -30,7 +30,9 @@ * * Note: not specifying ownerOnly maps to ownerOnly = false * From man chmod: If no user specs are given, the effect is as if `a' were given. - * + * This class is mainly used to change permissions when files are unjarred from the + * job.jar. The executable specified in the mappper/reducer is set to be executable + * using this class. */ public class MustangFile extends File { Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?view=diff&rev=469635&r1=469634&r2=469635 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Tue Oct 31 12:13:31 2006 @@ -39,7 +39,8 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.RunningJob; - +import org.apache.hadoop.filecache.*; +import org.apache.hadoop.util.*; /** All the client-side work happens here. * (Jar packaging, MapRed job submission and monitoring) * @author Michel Tourn @@ -54,7 +55,13 @@ argv_ = argv; mayExit_ = mayExit; } - + + /** + * This is the method that actually + * intializes the job conf and submits the job + * to the jobtracker + * @throws IOException + */ public void go() throws IOException { init(); @@ -65,7 +72,7 @@ setJobConf(); submitAndMonitorJob(); } - + protected void init() { try { env_ = new Environment(); @@ -157,6 +164,10 @@ return new File(getHadoopClientHome() + "/conf", hadoopAliasConf_).getAbsolutePath(); } + /** + * This method parses the command line args + * to a hadoop streaming job + */ void parseArgv() { if (argv_.length == 0) { exitUsage(false); @@ -219,7 +230,22 @@ } else if ((s = optionArg(argv_, i, "-inputreader", inReaderSpec_ != null)) != null) { i++; inReaderSpec_ = s; - } else { + } else if((s = optionArg(argv_, i, "-cacheArchive", false)) != null) { + i++; + if (cacheArchives == null) + cacheArchives = s; + else + cacheArchives = cacheArchives + "," + s; + } else if((s = optionArg(argv_, i, "-cacheFile", false)) != null) { + i++; + System.out.println(" the val of s is " + s); + if (cacheFiles == null) + cacheFiles = s; + else + cacheFiles = cacheFiles + "," + s; + System.out.println(" the val of cachefiles is " + cacheFiles); + } + else { System.err.println("Unexpected argument: " + argv_[i]); exitUsage(false); } @@ -269,6 +295,8 @@ System.out.println(" -inputreader Optional."); System.out.println(" -jobconf = Optional. Add or override a JobConf property"); System.out.println(" -cmdenv = Optional. Pass env.var to streaming commands"); + System.out.println(" -cacheFile fileNameURI"); + System.out.println(" -cacheArchive fileNameURI"); System.out.println(" -verbose"); System.out.println(); if (!detailed) { @@ -392,7 +420,7 @@ // $HADOOP_HOME/bin/hadoop jar /not/first/on/classpath/custom-hadoop-streaming.jar // where findInClasspath() would find the version of hadoop-streaming.jar in $HADOOP_HOME String runtimeClasses = userJobConfProps_.get("stream.shipped.hadoopstreaming"); // jar or class dir -System.out.println(runtimeClasses + "=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming"); + System.out.println(runtimeClasses + "=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming"); if (runtimeClasses == null) { runtimeClasses = StreamUtil.findInClasspath(StreamJob.class.getName()); @@ -435,6 +463,11 @@ return jobJarName; } + /** + * This method sets the user jobconf variable specified + * by user using -jobconf key=value + * @param doEarlyProps + */ protected void setUserJobConfProps(boolean doEarlyProps) { Iterator it = userJobConfProps_.keySet().iterator(); while (it.hasNext()) { @@ -448,7 +481,17 @@ } } } - + + /** + * get the uris of all the files/caches + */ + protected void getURIs(String lcacheArchives, String lcacheFiles) { + String archives[] = StringUtils.getStrings(lcacheArchives); + String files[] = StringUtils.getStrings(lcacheFiles); + fileURIs = StringUtils.stringToURI(files); + archiveURIs = StringUtils.stringToURI(archives); + } + protected void setJobConf() throws IOException { msg("hadoopAliasConf_ = " + hadoopAliasConf_); config_ = new Configuration(); @@ -548,7 +591,8 @@ jobConf_.set(k, v); } } - + + setUserJobConfProps(false); // output setup is done late so we can customize for reducerNone_ //jobConf_.setOutputDir(new File(output_)); setOutputSpec(); @@ -561,20 +605,36 @@ // last, allow user to override anything // (although typically used with properties we didn't touch) - setUserJobConfProps(false); jar_ = packageJobJar(); if (jar_ != null) { jobConf_.setJar(jar_); } - + + if ((cacheArchives != null) || (cacheFiles != null)){ + getURIs(cacheArchives, cacheFiles); + boolean b = DistributedCache.checkURIs(fileURIs, archiveURIs); + if (!b) + fail(LINK_URI); + DistributedCache.createSymlink(jobConf_); + } + // set the jobconf for the caching parameters + if (cacheArchives != null) + DistributedCache.setCacheArchives(archiveURIs, jobConf_); + if (cacheFiles != null) + DistributedCache.setCacheFiles(fileURIs, jobConf_); + if(verbose_) { listJobConfProperties(); } - + msg("submitting to jobconf: " + getJobTrackerHostPort()); } + /** + * Prints out the jobconf properties on stdout + * when verbose is specified. + */ protected void listJobConfProperties() { msg("==== JobConf properties:"); @@ -765,6 +825,10 @@ protected String comCmd_; protected String redCmd_; protected String cluster_; + protected String cacheFiles; + protected String cacheArchives; + protected URI[] fileURIs; + protected URI[] archiveURIs; protected ArrayList configPath_ = new ArrayList(); // protected String hadoopAliasConf_; protected String inReaderSpec_; @@ -780,5 +844,6 @@ protected RunningJob running_; protected String jobId_; - + protected static String LINK_URI = "You need to specify the uris as hdfs://host:port/#linkname," + + "Please specify a different link name for all of your caching URIs"; } Added: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java?view=auto&rev=469635 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java (added) +++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java Tue Oct 31 12:13:31 2006 @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.streaming; + +import junit.framework.TestCase; +import java.io.*; +import java.util.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.dfs.MiniDFSCluster; +/** + * This test case tests the symlink creation + * utility provided by distributed caching + * @author mahadev + * + */ +public class TestSymLink extends TestCase +{ + String INPUT_FILE = "/testing-streaming/input.txt"; + String OUTPUT_DIR = "/testing-streaming/out"; + String CACHE_FILE = "/testing-streaming/cache.txt"; + String input = "check to see if we can read this none reduce"; + String map = "xargs cat "; + String reduce = "cat"; + String mapString = "testlink\n"; + String cacheString = "This is just the cache string"; + StreamJob job; + + public TestSymLink() throws IOException + { + } + + public void testSymLink() + { + try { + boolean mayExit = false; + int jobTrackerPort = 60050; + MiniMRCluster mr = null; + MiniDFSCluster dfs = null; + FileSystem fileSys = null; + try{ + Configuration conf = new Configuration(); + dfs = new MiniDFSCluster(8050, conf, false); + fileSys = dfs.getFileSystem(); + String namenode = fileSys.getName(); + mr = new MiniMRCluster(jobTrackerPort, 60060, 1, namenode, true, 3); + // During tests, the default Configuration will use a local mapred + // So don't specify -config or -cluster + String strJobtracker = "mapred.job.tracker=" + "localhost:" + jobTrackerPort; + String strNamenode = "fs.default.name=" + namenode; + String argv[] = new String[] { + "-input", INPUT_FILE, + "-output", OUTPUT_DIR, + "-mapper", map, + "-reducer", reduce, + //"-verbose", + //"-jobconf", "stream.debug=set" + "-jobconf", strNamenode, + "-jobconf", strJobtracker, + "-cacheFile", "dfs://"+fileSys.getName()+CACHE_FILE + "#testlink" + }; + + fileSys.delete(new Path(OUTPUT_DIR)); + fileSys.mkdirs(new Path(OUTPUT_DIR)); + + DataOutputStream file = fileSys.create(new Path(INPUT_FILE)); + file.writeBytes(mapString); + file.close(); + file = fileSys.create(new Path(CACHE_FILE)); + file.writeBytes(cacheString); + file.close(); + + job = new StreamJob(argv, mayExit); + job.go(); + String line = null; + Path[] fileList = fileSys.listPaths(new Path(OUTPUT_DIR)); + for (int i = 0; i < fileList.length; i++){ + System.out.println(fileList[i].toString()); + BufferedReader bread = + new BufferedReader(new InputStreamReader(fileSys.open(fileList[i]))); + line = bread.readLine(); + System.out.println(line); + } + assertEquals(cacheString + "\t", line); + } finally{ + if (fileSys != null) { fileSys.close(); } + if (dfs != null) { dfs.shutdown(); } + if (mr != null) { mr.shutdown();} + } + + } catch(Exception e) { + failTrace(e); + } + } + + void failTrace(Exception e) + { + StringWriter sw = new StringWriter(); + e.printStackTrace(new PrintWriter(sw)); + fail(sw.toString()); + } + + public static void main(String[]args) throws Exception + { + new TestStreaming().testCommandLine(); + } + +} Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java?view=diff&rev=469635&r1=469634&r2=469635 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java Tue Oct 31 12:13:31 2006 @@ -302,14 +302,7 @@ */ public String[] getStrings(String name) { String valueString = get(name); - if (valueString == null) - return null; - StringTokenizer tokenizer = new StringTokenizer (valueString,","); - List values = new ArrayList(); - while (tokenizer.hasMoreTokens()) { - values.add(tokenizer.nextToken()); - } - return (String[])values.toArray(new String[values.size()]); + return StringUtils.getStrings(valueString); } /** Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java?view=diff&rev=469635&r1=469634&r2=469635 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java Tue Oct 31 12:13:31 2006 @@ -43,7 +43,7 @@ /** * * @param cache the cache to be localized, this should be specified as - * new URI(dfs://hostname:port/absoulte_path_to_file). If no schema + * new URI(dfs://hostname:port/absoulte_path_to_file#LINKNAME). If no schema * or hostname:port is provided the file is assumed to be in the filesystem * being used in the Configuration * @param conf The Confguration file which contains the filesystem @@ -55,12 +55,14 @@ * @param md5 this is a mere checksum to verufy if you are using the right cache. * You need to pass the md5 of the crc file in DFS. This is matched against the one * calculated in this api and if it does not match, the cache is not localized. + * @param currentWorkDir this is the directory where you would want to create symlinks + * for the locally cached files/archives * @return the path to directory where the archives are unjarred in case of archives, * the path to the file where the file is copied locally * @throws IOException */ public static Path getLocalCache(URI cache, Configuration conf, Path baseDir, - boolean isArchive, String md5) throws IOException { + boolean isArchive, String md5, Path currentWorkDir) throws IOException { String cacheId = makeRelative(cache, conf); CacheStatus lcacheStatus; Path localizedPath; @@ -80,7 +82,7 @@ } } synchronized (lcacheStatus) { - localizedPath = localizeCache(cache, lcacheStatus, conf, isArchive, md5); + localizedPath = localizeCache(cache, lcacheStatus, conf, isArchive, md5, currentWorkDir); } // try deleting stuff if you can long size = FileUtil.getDU(new File(baseDir.toString())); @@ -157,15 +159,26 @@ // the methoed which actually copies the caches locally and unjars/unzips them private static Path localizeCache(URI cache, CacheStatus cacheStatus, - Configuration conf, boolean isArchive, String md5) throws IOException { + Configuration conf, boolean isArchive, String md5, Path currentWorkDir) throws IOException { boolean b = true; + boolean doSymlink = getSymlink(conf); FileSystem dfs = getFileSystem(cache, conf); b = ifExistsAndFresh(cacheStatus, cache, dfs, md5, conf); if (b) { - if (isArchive) + if (isArchive) { + if (doSymlink) + FileUtil.symLink(cacheStatus.localLoadPath.toString(), + currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment()); + return cacheStatus.localLoadPath; - else + } + else { + if (doSymlink) + FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(), + currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment()); + return cacheFilePath(cacheStatus.localLoadPath); + } } else { // remove the old archive // if the old archive cannot be removed since it is being used by another @@ -179,7 +192,6 @@ localFs.delete(cacheStatus.localLoadPath); Path parchive = new Path(cacheStatus.localLoadPath, new Path(cacheStatus.localLoadPath.getName())); - localFs.mkdirs(cacheStatus.localLoadPath); String cacheId = cache.getPath(); dfs.copyToLocalFile(new Path(cacheId), parchive); @@ -199,14 +211,23 @@ // else will not do anyhting // and copy the file into the dir as it is } + // create a symlink if #NAME is specified as fragment in the + // symlink cacheStatus.currentStatus = true; cacheStatus.md5 = checkSum; } - if (isArchive) + if (isArchive){ + if (doSymlink) + FileUtil.symLink(cacheStatus.localLoadPath.toString(), + currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment()); return cacheStatus.localLoadPath; - else + } + else { + if (doSymlink) + FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(), + currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment()); return cacheFilePath(cacheStatus.localLoadPath); - + } } // Checks if the cache has already been localized and is fresh @@ -451,6 +472,75 @@ String files = conf.get("mapred.cache.files"); conf.set("mapred.cache.files", files == null ? uri.toString() : files + "," + uri.toString()); + } + + /** + * This method allows you to create symlinks in the current working directory + * of the task to all the cache files/archives + * @param conf the jobconf + */ + public static void createSymlink(Configuration conf){ + conf.set("mapred.create.symlink", "yes"); + } + + /** + * This method checks to see if symlinks are to be create for the + * localized cache files in the current working directory + * @param conf the jobconf + * @return true if symlinks are to be created- else return false + */ + public static boolean getSymlink(Configuration conf){ + String result = conf.get("mapred.create.symlink"); + if ("yes".equals(result)){ + return true; + } + return false; + } + + /** + * This method checks if there is a conflict in the fragment names + * of the uris. Also makes sure that each uri has a fragment. It + * is only to be called if you want to create symlinks for + * the various archives and files. + * @param uriFiles The uri array of urifiles + * @param uriArchives the uri array of uri archives + */ + public static boolean checkURIs(URI[] uriFiles, URI[] uriArchives){ + if ((uriFiles == null) && (uriArchives == null)){ + return true; + } + if (uriFiles != null){ + for (int i = 0; i < uriFiles.length; i++){ + String frag1 = uriFiles[i].getFragment(); + if (frag1 == null) + return false; + for (int j=i+1; j < uriFiles.length; i++){ + String frag2 = uriFiles[j].getFragment(); + if (frag2 == null) + return false; + if (frag1.equalsIgnoreCase(frag2)) + return false; + } + if (uriArchives != null){ + for (int j = 0; j < uriArchives.length; j++){ + String frag2 = uriArchives[j].getFragment(); + if (frag2 == null){ + return false; + } + if (frag1.equalsIgnoreCase(frag2)) + return false; + for (int k=j+1; k < uriArchives.length; k++){ + String frag3 = uriArchives[k].getFragment(); + if (frag3 == null) + return false; + if (frag2.equalsIgnoreCase(frag3)) + return false; + } + } + } + } + } + return true; } private static class CacheStatus { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java?view=diff&rev=469635&r1=469634&r2=469635 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java Tue Oct 31 12:13:31 2006 @@ -301,4 +301,23 @@ zipFile.close(); } } + + /** + * Create a soft link between a src and destination + * only on a local disk. HDFS does not support this + * @param target the target for symlink + * @param destination the symlink + * @return value returned by the command + */ + public static int symLink(String target, String linkname) throws IOException{ + String cmd = "ln -s " + target + " " + linkname; + Process p = Runtime.getRuntime().exec( cmd, null ); + int returnVal = -1; + try{ + returnVal = p.waitFor(); + } catch(InterruptedException e){ + //do nothing as of yet + } + return returnVal; + } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=469635&r1=469634&r2=469635 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue Oct 31 12:13:31 2006 @@ -80,7 +80,7 @@ //before preparing the job localize //all the archives - + File workDir = new File(new File(t.getJobFile()).getParentFile().getParent(), "work"); URI[] archives = DistributedCache.getCacheArchives(conf); URI[] files = DistributedCache.getCacheFiles(conf); if ((archives != null) || (files != null)) { @@ -88,7 +88,8 @@ String[] md5 = DistributedCache.getArchiveMd5(conf); Path[] p = new Path[archives.length]; for (int i = 0; i < archives.length;i++){ - p[i] = DistributedCache.getLocalCache(archives[i], conf, conf.getLocalPath(TaskTracker.getCacheSubdir()), true, md5[i]); + p[i] = DistributedCache.getLocalCache(archives[i], conf, + conf.getLocalPath(TaskTracker.getCacheSubdir()), true, md5[i], new Path(workDir.getAbsolutePath())); } DistributedCache.setLocalArchives(conf, stringifyPathArray(p)); } @@ -97,7 +98,7 @@ Path[] p = new Path[files.length]; for (int i = 0; i < files.length;i++){ p[i] = DistributedCache.getLocalCache(files[i], conf, conf.getLocalPath(TaskTracker - .getCacheSubdir()), false, md5[i]); + .getCacheSubdir()), false, md5[i], new Path(workDir.getAbsolutePath())); } DistributedCache.setLocalFiles(conf, stringifyPathArray(p)); } @@ -123,7 +124,6 @@ // start with same classpath as parent process classPath.append(System.getProperty("java.class.path")); classPath.append(sep); - File workDir = new File(new File(t.getJobFile()).getParentFile().getParent(), "work"); workDir.mkdirs(); String jar = conf.getJar(); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java?view=diff&rev=469635&r1=469634&r2=469635 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java Tue Oct 31 12:13:31 2006 @@ -24,7 +24,10 @@ import java.net.URISyntaxException; import java.text.DateFormat; import java.text.DecimalFormat; +import java.util.ArrayList; import java.util.Date; +import java.util.List; +import java.util.StringTokenizer; import org.apache.hadoop.fs.*; @@ -253,4 +256,19 @@ return buf.toString(); } -} + /** + * returns an arraylist of strings + * @param str the comma seperated string values + * @return the arraylist of the comma seperated string values + */ + public static String[] getStrings(String str){ + if (str == null) + return null; + StringTokenizer tokenizer = new StringTokenizer (str,","); + List values = new ArrayList(); + while (tokenizer.hasMoreTokens()) { + values.add(tokenizer.nextToken()); + } + return (String[])values.toArray(new String[values.size()]); + } +} \ No newline at end of file