hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject svn commit: r635050 - in /hadoop/core/trunk: ./ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/security/ src/test/org/apache/hadoop/fs/ src/test/o...
Date Sat, 08 Mar 2008 19:18:48 GMT
Author: nigel
Date: Sat Mar  8 11:18:45 2008
New Revision: 635050

URL: http://svn.apache.org/viewvc?rev=635050&view=rev
Log:
HADOOP-2915. Fixed FileSystem.CACHE so that a username is included in the cache key. Contributed
by Tsz Wo (Nicholas) SZE.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java
    hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java
    hadoop/core/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/trunk/src/java/org/apache/hadoop/security/UserGroupInformation.java
    hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=635050&r1=635049&r2=635050&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Sat Mar  8 11:18:45 2008
@@ -286,6 +286,9 @@
     they were looking at the same config variables (Chris Douglas via
     acmurthy) 
 
+    HADOOP-2915. Fixed FileSystem.CACHE so that a username is included
+    in the cache key. (Tsz Wo (Nicholas), SZE via nigel)
+
 Release 0.16.0 - 2008-02-07
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java?rev=635050&r1=635049&r2=635050&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java
(original)
+++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java
Sat Mar  8 11:18:45 2008
@@ -56,11 +56,10 @@
       boolean mayExit = false;
       MiniMRCluster mr = null;
       MiniDFSCluster dfs = null; 
-      FileSystem fileSys = null;
       try{
         Configuration conf = new Configuration();
         dfs = new MiniDFSCluster(conf, 1, true, null);
-        fileSys = dfs.getFileSystem();
+        FileSystem fileSys = dfs.getFileSystem();
         String namenode = fileSys.getName();
         mr  = new MiniMRCluster(1, namenode, 3);
         // During tests, the default Configuration will use a local mapred
@@ -99,6 +98,8 @@
           
         job = new StreamJob(argv, mayExit);     
         job.go();
+
+	fileSys = dfs.getFileSystem();
         String line = null;
         String line2 = null;
         Path[] fileList = FileUtil.stat2Paths(fileSys.listStatus(
@@ -116,7 +117,6 @@
         assertEquals(cacheString + "\t", line);
         assertEquals(cacheString2 + "\t", line2);
       } finally{
-        if (fileSys != null) { fileSys.close(); }
         if (dfs != null) { dfs.shutdown(); }
         if (mr != null) { mr.shutdown();}
       }

Modified: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java?rev=635050&r1=635049&r2=635050&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java
(original)
+++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java
Sat Mar  8 11:18:45 2008
@@ -53,11 +53,10 @@
       boolean mayExit = false;
       MiniMRCluster mr = null;
       MiniDFSCluster dfs = null; 
-      FileSystem fileSys = null;
       try{
         Configuration conf = new Configuration();
         dfs = new MiniDFSCluster(conf, 1, true, null);
-        fileSys = dfs.getFileSystem();
+        FileSystem fileSys = dfs.getFileSystem();
         String namenode = fileSys.getName();
         mr  = new MiniMRCluster(1, namenode, 3);
         // During tests, the default Configuration will use a local mapred
@@ -91,6 +90,8 @@
           
         job = new StreamJob(argv, mayExit);      
         job.go();
+
+        fileSys = dfs.getFileSystem();
         String line = null;
         Path[] fileList = FileUtil.stat2Paths(fileSys.listStatus(
                                                 new Path(OUTPUT_DIR),
@@ -104,7 +105,6 @@
         }
         assertEquals(cacheString + "\t", line);
       } finally{
-        if (fileSys != null) { fileSys.close(); }
         if (dfs != null) { dfs.shutdown(); }
         if (mr != null) { mr.shutdown();}
       }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/FileSystem.java?rev=635050&r1=635049&r2=635050&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/FileSystem.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/FileSystem.java Sat Mar  8 11:18:45 2008
@@ -28,15 +28,15 @@
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.*;
-import org.apache.hadoop.fs.permission.AccessControlException;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.security.UserGroupInformation;
 
 /****************************************************************
  * An abstract base class for a fairly generic filesystem.  It
  * may be implemented as a distributed filesystem, or as a "local"
  * one that reflects the locally-connected disk.  The local version
- * exists for small Hadopp instances and for testing.
+ * exists for small Hadoop instances and for testing.
  *
  * <p>
  *
@@ -50,14 +50,14 @@
  * The local implementation is {@link LocalFileSystem} and distributed
  * implementation is {@link DistributedFileSystem}.
  *****************************************************************/
-public abstract class FileSystem extends Configured {
+public abstract class FileSystem extends Configured implements Closeable {
   private static final String FS_DEFAULT_NAME_KEY = "fs.default.name";
 
   public static final Log LOG = LogFactory.getLog("org.apache.hadoop.fs.FileSystem");
 
-  // cache indexed by URI scheme and authority
-  private static final Map<String,Map<String,FileSystem>> CACHE
-    = new HashMap<String,Map<String,FileSystem>>();
+  /** FileSystem cache */
+  private static final Cache CACHE = new Cache();
+
   /**
    * Parse the cmd-line args, starting at i.  Remove consumed args
    * from array.  We expect param in the form:
@@ -172,9 +172,7 @@
    * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem
class.
    * The entire URI is passed to the FileSystem instance's initialize method.
    */
-  public static synchronized FileSystem get(URI uri, Configuration conf)
-    throws IOException {
-
+  public static FileSystem get(URI uri, Configuration conf) throws IOException {
     String scheme = uri.getScheme();
     String authority = uri.getAuthority();
 
@@ -190,27 +188,7 @@
       }
     }
 
-    Map<String,FileSystem> authorityToFs = CACHE.get(scheme);
-    if (authorityToFs == null) {
-      if (CACHE.isEmpty()) {
-        Runtime.getRuntime().addShutdownHook(clientFinalizer);
-      }
-      authorityToFs = new HashMap<String,FileSystem>();
-      CACHE.put(scheme, authorityToFs);
-    }
-
-    FileSystem fs = authorityToFs.get(authority);
-    if (fs == null) {
-      Class fsClass = conf.getClass("fs."+scheme+".impl", null);
-      if (fsClass == null) {
-        throw new IOException("No FileSystem for scheme: " + scheme);
-      }
-      fs = (FileSystem)ReflectionUtils.newInstance(fsClass, conf);
-      fs.initialize(uri, conf);
-      authorityToFs.put(authority, fs);
-    }
-
-    return fs;
+    return CACHE.get(uri, conf);
   }
 
   private static class ClientFinalizer extends Thread {
@@ -230,14 +208,8 @@
    * 
    * @throws IOException
    */
-  public static synchronized void closeAll() throws IOException {
-    Set<String> scheme = new HashSet<String>(CACHE.keySet());
-    for (String s : scheme) {
-      Set<String> authority = new HashSet<String>(CACHE.get(s).keySet());
-      for (String a : authority) {
-        CACHE.get(s).get(a).close();
-      }
-    }
+  public static void closeAll() throws IOException {
+    CACHE.closeAll();
   }
 
   /** Make sure that a path specifies a FileSystem. */
@@ -1173,22 +1145,7 @@
    * release any held locks.
    */
   public void close() throws IOException {
-    URI uri = getUri();
-    synchronized (FileSystem.class) {
-      Map<String,FileSystem> authorityToFs = CACHE.get(uri.getScheme());
-      if (authorityToFs != null) {
-        authorityToFs.remove(uri.getAuthority());
-        if (authorityToFs.isEmpty()) {
-          CACHE.remove(uri.getScheme());
-          if (CACHE.isEmpty() && !clientFinalizer.isAlive()) {
-            if (!Runtime.getRuntime().removeShutdownHook(clientFinalizer)) {
-              LOG.info("Could not cancel cleanup thread, though no " +
-                       "FileSystems are open");
-            }
-          }
-        }
-      }
-    }
+    CACHE.remove(new Cache.Key(getUri(), getConf()));
   }
 
   /** Return the total size of all files in the filesystem.*/
@@ -1274,5 +1231,119 @@
    */
   public void setOwner(Path p, String username, String groupname
       ) throws IOException {
+  }
+
+  private static FileSystem createFileSystem(URI uri, Configuration conf
+      ) throws IOException {
+    Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
+    if (clazz == null) {
+      throw new IOException("No FileSystem for scheme: " + uri.getScheme());
+    }
+    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
+    fs.initialize(uri, conf);
+    return fs;
+  }
+
+  /** Caching FileSystem objects */
+  private static class Cache {
+    final Map<Key, FsRef> map = new HashMap<Key, FsRef>();
+
+    synchronized FileSystem get(URI uri, Configuration conf) throws IOException{
+      Key key = new Key(uri, conf);
+      FsRef ref = map.get(key);
+      FileSystem fs = ref == null? null: ref.get();
+      if (fs == null) {
+        if (map.isEmpty() && !clientFinalizer.isAlive()) {
+          Runtime.getRuntime().addShutdownHook(clientFinalizer);
+        }
+
+        fs = createFileSystem(uri, conf);
+        map.put(key, new FsRef(fs, key));
+      }
+      return fs;
+    }
+
+    synchronized FsRef remove(Key key) {
+      FsRef ref = map.remove(key);
+      if (map.isEmpty() && !clientFinalizer.isAlive()) {
+        if (!Runtime.getRuntime().removeShutdownHook(clientFinalizer)) {
+          LOG.info("Could not cancel cleanup thread, though no " +
+                   "FileSystems are open");
+        }
+      }
+      return ref;
+    }
+
+    synchronized void closeAll() throws IOException {
+      List<IOException> exceptions = new ArrayList<IOException>();
+      for(FsRef ref : new ArrayList<FsRef>(map.values())) {
+        FileSystem fs = ref.get();
+        if (fs != null) {
+          try {
+            fs.close();
+          }
+          catch(IOException ioe) {
+            exceptions.add(ioe);
+          }
+        }
+        else {
+          remove(ref.key);
+        }        
+      }
+
+      if (!exceptions.isEmpty()) {
+        throw MultipleIOException.createIOException(exceptions);
+      }
+    }
+
+    /** Reference of FileSystem which contains the cache key */
+    private static class FsRef {
+      final FileSystem fs;
+      final Key key;
+      
+      FsRef(FileSystem fs, Key key) {
+        this.fs = fs;
+        this.key = key;
+      }
+
+      FileSystem get() {return fs;}
+    }
+
+    /** FileSystem.Cache.Key */
+    private static class Key {
+      final String scheme;
+      final String authority;
+      final String username;
+
+      Key(URI uri, Configuration conf) throws IOException {
+        scheme = uri.getScheme();
+        authority = uri.getAuthority();
+        UserGroupInformation ugi = UserGroupInformation.readFrom(conf);
+        username = ugi == null? null: ugi.getUserName();
+      }
+
+      /** {@inheritDoc} */
+      public int hashCode() {
+        return (scheme + authority + username).hashCode();
+      }
+
+      static boolean isEqual(Object a, Object b) {
+        return a == b || (a != null && a.equals(b));        
+      }
+
+      /** {@inheritDoc} */
+      public boolean equals(Object obj) {
+        if (obj == this) {
+          return true;
+        }
+        if (obj != null && obj instanceof Key) {
+          Key that = (Key)obj;
+          return isEqual(this.scheme, that.scheme)
+                 && isEqual(this.authority, that.authority)
+                 && isEqual(this.username, that.username);
+        }
+        return false;        
+      }
+    }
   }
 }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=635050&r1=635049&r2=635050&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Sat Mar  8 11:18:45
2008
@@ -147,7 +147,6 @@
   //for serving map output to the other nodes
 
   static Random r = new Random();
-  FileSystem fs = null;
   private static final String SUBDIR = "taskTracker";
   private static final String CACHEDIR = "archive";
   private static final String JOBCACHE = "jobcache";
@@ -663,6 +662,7 @@
             throw new IOException("Not able to create job directory "
                                   + jobDir.toString());
         }
+        FileSystem fs =FileSystem.getNamed(jobClient.getFilesystemName(),fConf);
         fs.copyToLocalFile(new Path(jobFile), localJobFile);
         JobConf localJobConf = new JobConf(localJobFile);
         
@@ -819,12 +819,6 @@
     return jobClient;
   }
         
-  /**Return the DFS filesystem
-   */
-  public FileSystem getFileSystem(){
-    return fs;
-  }
-  
   /** Return the port at which the tasktracker bound to */
   public synchronized InetSocketAddress getTaskTrackerReportAddress() {
     return taskReportAddress;
@@ -864,7 +858,6 @@
    */
   State offerService() throws Exception {
     long lastHeartbeat = 0;
-    this.fs = FileSystem.getNamed(jobClient.getFilesystemName(), this.fConf);
 
     while (running && !shuttingDown) {
       try {

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/security/UserGroupInformation.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/security/UserGroupInformation.java?rev=635050&r1=635049&r2=635050&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/security/UserGroupInformation.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/security/UserGroupInformation.java Sat Mar
 8 11:18:45 2008
@@ -17,8 +17,13 @@
  */
 package org.apache.hadoop.security;
 
+import java.io.IOException;
+
+import javax.security.auth.login.LoginException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 
 /** A {@link Writable} abstract class for storing user and groups information.
@@ -53,4 +58,15 @@
    * @return an array of group names
    */
   public abstract String[] getGroupNames();
+
+  /** Read a {@link UserGroupInformation} from conf */
+  public static UserGroupInformation readFrom(Configuration conf
+      ) throws IOException {
+    try {
+      return UnixUserGroupInformation.readFromConf(conf,
+        UnixUserGroupInformation.UGI_PROPERTY_NAME);
+    } catch (LoginException e) {
+      throw (IOException)new IOException().initCause(e);
+    }
+  }
 }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java?rev=635050&r1=635049&r2=635050&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java Sat Mar  8 11:18:45
2008
@@ -32,10 +32,8 @@
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.UTF8;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.mapred.InputFormatBase;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
@@ -43,9 +41,11 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.lib.LongSumReducer;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
 
 public class TestFileSystem extends TestCase {
-  private static final Log LOG = InputFormatBase.LOG;
+  private static final Log LOG = FileSystem.LOG;
 
   private static Configuration conf = new Configuration();
   private static int BUFFER_SIZE = conf.getInt("io.file.buffer.size", 4096);
@@ -460,4 +460,17 @@
     }
   }
 
+  static Configuration createConf4Testing(String username) throws Exception {
+    Configuration conf = new Configuration();
+    UnixUserGroupInformation.saveToConf(conf,
+        UnixUserGroupInformation.UGI_PROPERTY_NAME,
+        new UnixUserGroupInformation(username, new String[]{"group"}));
+    return conf;    
+  }
+
+  public void testFsCache() throws Exception {
+    Configuration c1 = createConf4Testing("foo");
+    Configuration c2 = createConf4Testing("bar");
+    assertFalse(FileSystem.get(c1) == FileSystem.get(c2));
+  }
 }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java?rev=635050&r1=635049&r2=635050&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java Sat
Mar  8 11:18:45 2008
@@ -22,6 +22,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.Properties;
 
@@ -43,7 +44,6 @@
 public abstract class ClusterMapReduceTestCase extends TestCase {
   private MiniDFSCluster dfsCluster = null;
   private MiniMRCluster mrCluster = null;
-  private FileSystem fileSystem = null;
 
   /**
    * Creates Hadoop Cluster and DFS before a test case is run.
@@ -79,11 +79,10 @@
         }
       }
       dfsCluster = new MiniDFSCluster(conf, 2, reformatDFS, null);
-      fileSystem = dfsCluster.getFileSystem();
 
       ConfigurableMiniMRCluster.setConfiguration(props);
       //noinspection deprecation
-      mrCluster = new ConfigurableMiniMRCluster(2, fileSystem.getName(), 1);
+      mrCluster = new ConfigurableMiniMRCluster(2, getFileSystem().getName(), 1);
     }
   }
 
@@ -129,7 +128,6 @@
     if (dfsCluster != null) {
       dfsCluster.shutdown();
       dfsCluster = null;
-      fileSystem = null;
     }
   }
 
@@ -150,9 +148,10 @@
    * TestCases should use this Filesystem instance.
    *
    * @return the filesystem used by Hadoop.
+   * @throws IOException 
    */
-  protected FileSystem getFileSystem() {
-    return fileSystem;
+  protected FileSystem getFileSystem() throws IOException {
+    return dfsCluster.getFileSystem();
   }
 
   /**

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=635050&r1=635049&r2=635050&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Sat Mar  8 11:18:45
2008
@@ -25,6 +25,7 @@
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.StaticMapping;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UnixUserGroupInformation;
 
 /**
  * This class creates a single-process Map-Reduce cluster for junit testing.
@@ -45,6 +46,7 @@
   private List<Thread> taskTrackerThreadList = new ArrayList<Thread>();
     
   private String namenode;
+  private UnixUserGroupInformation ugi = null;
     
   /**
    * An inner class that runs a job tracker.
@@ -102,7 +104,6 @@
   class TaskTrackerRunner implements Runnable {
     volatile TaskTracker tt;
     int trackerId;
-    JobConf conf = createJobConf();
     // the localDirs for this taskTracker
     String[] localDirs;
     volatile boolean isInitialized = false;
@@ -114,7 +115,7 @@
       this.trackerId = trackerId;
       this.numDir = numDir;
       localDirs = new String[numDir];
-      conf = createJobConf();
+      JobConf conf = createJobConf();
       if (hostname != null) {
         conf.set("slave.host.name", hostname);
       }
@@ -249,6 +250,11 @@
     result.set("mapred.job.tracker", "localhost:"+jobTrackerPort);
     result.set("mapred.job.tracker.http.address", 
                         "127.0.0.1:" + jobTrackerInfoPort);
+    if (ugi != null) {
+      result.set("mapred.system.dir", "/mapred/system");
+      UnixUserGroupInformation.saveToConf(result,
+          UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
+    }
     // for debugging have all task output sent to the test output
     JobClient.setTaskOutputFilter(result, JobClient.TaskStatusFilter.ALL);
     return result;
@@ -300,7 +306,7 @@
       boolean taskTrackerFirst, int numDir)
   throws IOException {
     this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, 
-        taskTrackerFirst, 1, null);
+        taskTrackerFirst, numDir, null);
   }
   
   public MiniMRCluster(int jobTrackerPort,
@@ -319,7 +325,14 @@
                        String namenode,
                        boolean taskTrackerFirst, int numDir,
                        String[] racks, String[] hosts) throws IOException {
+    this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, 
+        taskTrackerFirst, numDir, racks, hosts, null);
+  }
 
+  public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
+      int numTaskTrackers, String namenode, boolean taskTrackerFirst,
+      int numDir, String[] racks, String[] hosts, UnixUserGroupInformation ugi
+      ) throws IOException {
     if (racks != null && racks.length < numTaskTrackers) {
       LOG.error("Invalid number of racks specified. It should be at least " +
           "equal to the number of tasktrackers");
@@ -342,6 +355,7 @@
     this.jobTrackerInfoPort = 0;
     this.numTaskTrackers = numTaskTrackers;
     this.namenode = namenode;
+    this.ugi = ugi;
 
     // Create the JobTracker
     jobTracker = new JobTrackerRunner();

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java?rev=635050&r1=635049&r2=635050&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java Sat Mar  8 11:18:45
2008
@@ -175,15 +175,15 @@
     try {
       JobClient.runJob(jobConf);
       Path inFile = new Path(outDir, "reduce-out");
-      SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile,
-                                                           jobConf);
+      SequenceFile.Reader reader = new SequenceFile.Reader(
+          FileSystem.get(jobConf), inFile, jobConf);
       IntWritable numInside = new IntWritable();
       IntWritable numOutside = new IntWritable();
       reader.next(numInside, numOutside);
       reader.close();
       estimate = (double) (numInside.get()*4.0)/(numMaps*numPoints);
     } finally {
-      fileSys.delete(tmpDir);
+      FileSystem.get(jobConf).delete(tmpDir);
     }
     
     return estimate;

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java?rev=635050&r1=635049&r2=635050&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java Sat Mar 
8 11:18:45 2008
@@ -174,7 +174,6 @@
                    "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result);
           
     } finally {
-      if (fileSys != null) { fileSys.close(); }
       if (dfs != null) { dfs.shutdown(); }
       if (mr != null) { mr.shutdown();
       }
@@ -209,7 +208,6 @@
       
     } 
     finally {
-      if (fileSys != null) { fileSys.close(); }
       if (dfs != null) { dfs.shutdown(); }
       if (mr != null) { mr.shutdown();
       }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java?rev=635050&r1=635049&r2=635050&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java Sat Mar  8
11:18:45 2008
@@ -94,7 +94,6 @@
       // Run sort-validator to check if sort worked correctly
       runSortValidator(mr.createJobConf(), SORT_INPUT_PATH, SORT_OUTPUT_PATH);
     } finally {
-      if (fileSys != null) { fileSys.close(); }
       if (dfs != null) { dfs.shutdown(); }
       if (mr != null) { mr.shutdown();
       }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=635050&r1=635049&r2=635050&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Sat Mar  8
11:18:45 2008
@@ -116,7 +116,7 @@
    * @param mr the map-reduce cluster
    * @param taskDirs the task ids that should be present
    */
-  private static void checkTaskDirectories(MiniMRCluster mr,
+  static void checkTaskDirectories(MiniMRCluster mr,
                                            String[] jobIds,
                                            String[] taskDirs) {
     mr.waitUntilIdle();
@@ -159,7 +159,55 @@
       assertTrue("Directory " + taskDirs[i] + " not found", found[i]);
     }
   }
-  
+
+  static void runPI(MiniMRCluster mr, JobConf jobconf) throws IOException {
+    LOG.info("runPI");
+    double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, jobconf);
+    double error = Math.abs(Math.PI - estimate);
+    assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
+    checkTaskDirectories(mr, new String[]{}, new String[]{});
+  }
+
+  static void runWordCount(MiniMRCluster mr, JobConf jobConf) throws IOException {
+    LOG.info("runWordCount");
+    // Run a word count example
+    // Keeping tasks that match this pattern
+    jobConf.setKeepTaskFilesPattern("task_[^_]*_[0-9]*_m_000001_.*");
+    TestResult result;
+    final Path inDir = new Path("./wc/input");
+    final Path outDir = new Path("./wc/output");
+    result = launchWordCount(jobConf, inDir, outDir,
+                             "The quick brown fox\nhas many silly\n" + 
+                             "red fox sox\n",
+                             3, 1);
+    assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
+                 "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result.output);
+    String jobid = result.job.getJobID();
+    String taskid = "task_" + jobid.substring(4) + "_m_000001_0";
+    checkTaskDirectories(mr, new String[]{jobid}, new String[]{taskid});
+    // test with maps=0
+    jobConf = mr.createJobConf();
+    result = launchWordCount(jobConf, inDir, outDir, "owen is oom", 0, 1);
+    assertEquals("is\t1\noom\t1\nowen\t1\n", result.output);
+    // Run a job with input and output going to localfs even though the 
+    // default fs is hdfs.
+    {
+      FileSystem localfs = FileSystem.getLocal(jobConf);
+      String TEST_ROOT_DIR =
+        new File(System.getProperty("test.build.data","/tmp"))
+        .toString().replace(' ', '+');
+      Path localIn = localfs.makeQualified
+                        (new Path(TEST_ROOT_DIR + "/local/in"));
+      Path localOut = localfs.makeQualified
+                        (new Path(TEST_ROOT_DIR + "/local/out"));
+      result = launchWordCount(jobConf, localIn, localOut,
+                               "all your base belong to us", 1, 1);
+      assertEquals("all\t1\nbase\t1\nbelong\t1\nto\t1\nus\t1\nyour\t1\n", 
+                   result.output);
+      assertTrue("outputs on localfs", localfs.exists(localOut));
+    }
+  }
+
   public void testWithDFS() throws IOException {
     MiniDFSCluster dfs = null;
     MiniMRCluster mr = null;
@@ -170,52 +218,11 @@
       Configuration conf = new Configuration();
       dfs = new MiniDFSCluster(conf, 4, true, null);
       fileSys = dfs.getFileSystem();
-      mr = new MiniMRCluster(taskTrackers, fileSys.getName(), 1);
-      double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, 
-                                           mr.createJobConf());
-      double error = Math.abs(Math.PI - estimate);
-      assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
-      checkTaskDirectories(mr, new String[]{}, new String[]{});
-          
-      // Run a word count example
-      JobConf jobConf = mr.createJobConf();
-      // Keeping tasks that match this pattern
-      jobConf.setKeepTaskFilesPattern("task_[^_]*_[0-9]*_m_000001_.*");
-      TestResult result;
-      final Path inDir = new Path("/testing/wc/input");
-      final Path outDir = new Path("/testing/wc/output");
-      result = launchWordCount(jobConf, inDir, outDir,
-                               "The quick brown fox\nhas many silly\n" + 
-                               "red fox sox\n",
-                               3, 1);
-      assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
-                   "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result.output);
-      String jobid = result.job.getJobID();
-      String taskid = "task_" + jobid.substring(4) + "_m_000001_0";
-      checkTaskDirectories(mr, new String[]{jobid}, new String[]{taskid});
-      // test with maps=0
-      jobConf = mr.createJobConf();
-      result = launchWordCount(jobConf, inDir, outDir, "owen is oom", 0, 1);
-      assertEquals("is\t1\noom\t1\nowen\t1\n", result.output);
-      // Run a job with input and output going to localfs even though the 
-      // default fs is hdfs.
-      {
-        FileSystem localfs = FileSystem.getLocal(jobConf);
-        String TEST_ROOT_DIR =
-          new File(System.getProperty("test.build.data","/tmp"))
-          .toString().replace(' ', '+');
-        Path localIn = localfs.makeQualified
-                          (new Path(TEST_ROOT_DIR + "/local/in"));
-        Path localOut = localfs.makeQualified
-                          (new Path(TEST_ROOT_DIR + "/local/out"));
-        result = launchWordCount(jobConf, localIn, localOut,
-                                 "all your base belong to us", 1, 1);
-        assertEquals("all\t1\nbase\t1\nbelong\t1\nto\t1\nus\t1\nyour\t1\n", 
-                     result.output);
-        assertTrue("outputs on localfs", localfs.exists(localOut));
-      }
+      mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1);
+
+      runPI(mr, mr.createJobConf());
+      runWordCount(mr, mr.createJobConf());
     } finally {
-      if (fileSys != null) { fileSys.close(); }
       if (dfs != null) { dfs.shutdown(); }
       if (mr != null) { mr.shutdown();
       }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java?rev=635050&r1=635049&r2=635050&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java Sat
Mar  8 11:18:45 2008
@@ -116,6 +116,7 @@
        */
       mr = new MiniMRCluster(taskTrackers, namenode, 1, rack1, hosts3);
       jobConf = mr.createJobConf();
+      fileSys = dfs.getFileSystem();
       if (fileSys.exists(outputPath)) {
         fileSys.delete(outputPath);
       }
@@ -131,9 +132,6 @@
       mr.waitUntilIdle();
       
     } finally {
-      if (fileSys != null) { 
-        fileSys.close(); 
-      }
       if (dfs != null) { 
         dfs.shutdown(); 
       }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java?rev=635050&r1=635049&r2=635050&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java
Sat Mar  8 11:18:45 2008
@@ -117,7 +117,6 @@
       assertTrue(result);
           
     } finally {
-      if (fileSys != null) { fileSys.close(); }
       if (dfs != null) { dfs.shutdown(); }
       if (mr != null) { mr.shutdown(); }
     }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java?rev=635050&r1=635049&r2=635050&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java Sat Mar  8 11:18:45
2008
@@ -43,6 +43,11 @@
   private static final Log LOG =
     LogFactory.getLog(TestPipes.class.getName());
 
+  static void cleanup(FileSystem fs, Path p) throws IOException {
+    FileUtil.fullyDelete(fs, p);
+    assertFalse("output not cleaned up", fs.exists(p));
+  }
+
   public void testPipes() throws IOException {
     if (System.getProperty("compile.c++") == null) {
       LOG.info("compile.c++ is not defined, so skipping TestPipes");
@@ -50,7 +55,6 @@
     }
     MiniDFSCluster dfs = null;
     MiniMRCluster mr = null;
-    FileSystem fs = null;
     Path cppExamples = new Path(System.getProperty("install.c++.examples"));
     Path inputPath = new Path("/testing/in");
     Path outputPath = new Path("/testing/out");
@@ -58,20 +62,19 @@
       final int numSlaves = 2;
       Configuration conf = new Configuration();
       dfs = new MiniDFSCluster(conf, numSlaves, true, null);
-      fs = dfs.getFileSystem();
-      mr = new MiniMRCluster(numSlaves, fs.getName(), 1);
-      writeInputFile(fs, inputPath);
-      runProgram(mr, fs, new Path(cppExamples, "bin/wordcount-simple"), 
+      mr = new MiniMRCluster(numSlaves, dfs.getFileSystem().getName(), 1);
+      writeInputFile(dfs.getFileSystem(), inputPath);
+      runProgram(mr, dfs, new Path(cppExamples, "bin/wordcount-simple"), 
                  inputPath, outputPath, 3, 2, twoSplitOutput);
-      FileUtil.fullyDelete(fs, outputPath);
-      assertFalse("output not cleaned up", fs.exists(outputPath));
-      runProgram(mr, fs, new Path(cppExamples, "bin/wordcount-simple"), 
+      cleanup(dfs.getFileSystem(), outputPath);
+
+      runProgram(mr, dfs, new Path(cppExamples, "bin/wordcount-simple"), 
                  inputPath, outputPath, 3, 0, noSortOutput);
-      FileUtil.fullyDelete(fs, outputPath);
-      assertFalse("output not cleaned up", fs.exists(outputPath));
-      runProgram(mr, fs, new Path(cppExamples, "bin/wordcount-part"),
+      cleanup(dfs.getFileSystem(), outputPath);
+
+      runProgram(mr, dfs, new Path(cppExamples, "bin/wordcount-part"),
                  inputPath, outputPath, 3, 2, fixedPartitionOutput);
-      runNonPipedProgram(mr, fs, new Path(cppExamples, "bin/wordcount-nopipe"));
+      runNonPipedProgram(mr, dfs, new Path(cppExamples,"bin/wordcount-nopipe"));
       mr.waitUntilIdle();
     } finally {
       mr.shutdown();
@@ -127,25 +130,29 @@
     out.close();
   }
 
-  private void runProgram(MiniMRCluster mr, FileSystem fs, 
+  private void runProgram(MiniMRCluster mr, MiniDFSCluster dfs, 
                           Path program, Path inputPath, Path outputPath,
                           int numMaps, int numReduces, String[] expectedResults
                          ) throws IOException {
     Path wordExec = new Path("/testing/bin/application");
-    FileUtil.fullyDelete(fs, wordExec.getParent());
-    fs.copyFromLocalFile(program, wordExec);                                         
     JobConf job = mr.createJobConf();
     job.setNumMapTasks(numMaps);
     job.setNumReduceTasks(numReduces);
-    Submitter.setExecutable(job, fs.makeQualified(wordExec).toString());
-    Submitter.setIsJavaRecordReader(job, true);
-    Submitter.setIsJavaRecordWriter(job, true);
-    job.setInputPath(inputPath);
-    job.setOutputPath(outputPath);
-    RunningJob result = Submitter.submitJob(job);
-    assertTrue("pipes job failed", result.isSuccessful());
+    {
+      FileSystem fs = dfs.getFileSystem();
+      FileUtil.fullyDelete(fs, wordExec.getParent());
+      fs.copyFromLocalFile(program, wordExec);                                         
+      Submitter.setExecutable(job, fs.makeQualified(wordExec).toString());
+      Submitter.setIsJavaRecordReader(job, true);
+      Submitter.setIsJavaRecordWriter(job, true);
+      job.setInputPath(inputPath);
+      job.setOutputPath(outputPath);
+      RunningJob result = Submitter.submitJob(job);
+      assertTrue("pipes job failed", result.isSuccessful());
+    }
+
     List<String> results = new ArrayList<String>();
-    for (Path p:FileUtil.stat2Paths(fs.listStatus(outputPath,
+    for (Path p:FileUtil.stat2Paths(dfs.getFileSystem().listStatus(outputPath,
     		                        new OutputLogFilter()))) {
       results.add(TestMiniMRWithDFS.readOutput(p, job));
     }
@@ -165,7 +172,7 @@
    * @param program the program to run
    * @throws IOException
    */
-  private void runNonPipedProgram(MiniMRCluster mr, FileSystem dfs,
+  private void runNonPipedProgram(MiniMRCluster mr, MiniDFSCluster dfs,
                                   Path program) throws IOException {
     JobConf job = mr.createJobConf();
     job.setInputFormat(WordCountInputFormat.class);
@@ -176,8 +183,11 @@
     Path outDir = new Path(testDir, "output");
     Path wordExec = new Path("/testing/bin/application");
     Path jobXml = new Path(testDir, "job.xml");
-    FileUtil.fullyDelete(dfs, wordExec.getParent());
-    dfs.copyFromLocalFile(program, wordExec);
+    {
+      FileSystem fs = dfs.getFileSystem();
+      FileUtil.fullyDelete(fs, wordExec.getParent());
+      fs.copyFromLocalFile(program, wordExec);
+    }
     DataOutputStream out = local.create(new Path(inDir, "part0"));
     out.writeBytes("i am a silly test\n");
     out.writeBytes("you are silly\n");
@@ -195,7 +205,7 @@
                                   "-input", inDir.toString(),
                                   "-output", outDir.toString(),
                                   "-program", 
-                                  dfs.makeQualified(wordExec).toString(),
+                        dfs.getFileSystem().makeQualified(wordExec).toString(),
                                   "-reduces", "2"});
     } catch (Exception e) {
       assertTrue("got exception: " + StringUtils.stringifyException(e), false);



Mime
View raw message