hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject svn commit: r635061 - in /hadoop/core/branches/branch-0.16: ./ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/secur...
Date Sat, 08 Mar 2008 19:56:39 GMT
Author: nigel
Date: Sat Mar  8 11:56:35 2008
New Revision: 635061

URL: http://svn.apache.org/viewvc?rev=635061&view=rev
Log:
Branch specific patch for HADOOP-2915. Fixed FileSystem.CACHE so that a username is included in the cache key. Contributed by Tsz Wo (Nicholas) SZE.

Added:
    hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/io/MultipleIOException.java   (with props)
    hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java   (with props)
Modified:
    hadoop/core/branches/branch-0.16/CHANGES.txt
    hadoop/core/branches/branch-0.16/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java
    hadoop/core/branches/branch-0.16/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java
    hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/fs/FileSystem.java
    hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/security/UserGroupInformation.java
    hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/fs/TestFileSystem.java
    hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java
    hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
    hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/PiEstimator.java
    hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java
    hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
    hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
    hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java
    hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java

Modified: hadoop/core/branches/branch-0.16/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/CHANGES.txt?rev=635061&r1=635060&r2=635061&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.16/CHANGES.txt Sat Mar  8 11:56:35 2008
@@ -132,6 +132,9 @@
     they were looking at the same config variables (Chris Douglas via
     acmurthy) 
 
+    HADOOP-2915. Fixed FileSystem.CACHE so that a username is included
+    in the cache key. (Tsz Wo (Nicholas), SZE via nigel)
+
 Release 0.16.0 - 2008-02-07
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/branches/branch-0.16/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java?rev=635061&r1=635060&r2=635061&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java (original)
+++ hadoop/core/branches/branch-0.16/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java Sat Mar  8 11:56:35 2008
@@ -55,11 +55,10 @@
       boolean mayExit = false;
       MiniMRCluster mr = null;
       MiniDFSCluster dfs = null; 
-      FileSystem fileSys = null;
       try{
         Configuration conf = new Configuration();
         dfs = new MiniDFSCluster(conf, 1, true, null);
-        fileSys = dfs.getFileSystem();
+        FileSystem fileSys = dfs.getFileSystem();
         String namenode = fileSys.getName();
         mr  = new MiniMRCluster(1, namenode, 3);
         // During tests, the default Configuration will use a local mapred
@@ -98,6 +97,8 @@
           
         job = new StreamJob(argv, mayExit);     
         job.go();
+
+	fileSys = dfs.getFileSystem();
         String line = null;
         String line2 = null;
         Path[] fileList = fileSys.listPaths(new Path(OUTPUT_DIR));
@@ -113,7 +114,6 @@
         assertEquals(cacheString + "\t", line);
         assertEquals(cacheString2 + "\t", line2);
       } finally{
-        if (fileSys != null) { fileSys.close(); }
         if (dfs != null) { dfs.shutdown(); }
         if (mr != null) { mr.shutdown();}
       }

Modified: hadoop/core/branches/branch-0.16/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java?rev=635061&r1=635060&r2=635061&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java (original)
+++ hadoop/core/branches/branch-0.16/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java Sat Mar  8 11:56:35 2008
@@ -52,11 +52,10 @@
       boolean mayExit = false;
       MiniMRCluster mr = null;
       MiniDFSCluster dfs = null; 
-      FileSystem fileSys = null;
       try{
         Configuration conf = new Configuration();
         dfs = new MiniDFSCluster(conf, 1, true, null);
-        fileSys = dfs.getFileSystem();
+        FileSystem fileSys = dfs.getFileSystem();
         String namenode = fileSys.getName();
         mr  = new MiniMRCluster(1, namenode, 3);
         // During tests, the default Configuration will use a local mapred
@@ -90,6 +89,8 @@
           
         job = new StreamJob(argv, mayExit);      
         job.go();
+
+        fileSys = dfs.getFileSystem();
         String line = null;
         Path[] fileList = fileSys.listPaths(new Path(OUTPUT_DIR));
         for (int i = 0; i < fileList.length; i++){
@@ -101,7 +102,6 @@
         }
         assertEquals(cacheString + "\t", line);
       } finally{
-        if (fileSys != null) { fileSys.close(); }
         if (dfs != null) { dfs.shutdown(); }
         if (mr != null) { mr.shutdown();}
       }

Modified: hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/fs/FileSystem.java?rev=635061&r1=635060&r2=635061&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/fs/FileSystem.java (original)
+++ hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/fs/FileSystem.java Sat Mar  8 11:56:35 2008
@@ -28,15 +28,15 @@
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.*;
-import org.apache.hadoop.fs.permission.AccessControlException;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.security.UserGroupInformation;
 
 /****************************************************************
  * An abstract base class for a fairly generic filesystem.  It
  * may be implemented as a distributed filesystem, or as a "local"
  * one that reflects the locally-connected disk.  The local version
- * exists for small Hadopp instances and for testing.
+ * exists for small Hadoop instances and for testing.
  *
  * <p>
  *
@@ -50,12 +50,12 @@
  * The local implementation is {@link LocalFileSystem} and distributed
  * implementation is {@link DistributedFileSystem}.
  *****************************************************************/
-public abstract class FileSystem extends Configured {
+public abstract class FileSystem extends Configured implements Closeable {
   public static final Log LOG = LogFactory.getLog("org.apache.hadoop.fs.FileSystem");
 
-  // cache indexed by URI scheme and authority
-  private static final Map<String,Map<String,FileSystem>> CACHE
-    = new HashMap<String,Map<String,FileSystem>>();
+  /** FileSystem cache */
+  private static final Cache CACHE = new Cache();
+
   /**
    * Parse the cmd-line args, starting at i.  Remove consumed args
    * from array.  We expect param in the form:
@@ -137,9 +137,7 @@
    * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
    * The entire URI is passed to the FileSystem instance's initialize method.
    */
-  public static synchronized FileSystem get(URI uri, Configuration conf)
-    throws IOException {
-
+  public static FileSystem get(URI uri, Configuration conf) throws IOException {
     String scheme = uri.getScheme();
     String authority = uri.getAuthority();
 
@@ -147,27 +145,7 @@
       return get(conf);
     }
 
-    Map<String,FileSystem> authorityToFs = CACHE.get(scheme);
-    if (authorityToFs == null) {
-      if (CACHE.isEmpty()) {
-        Runtime.getRuntime().addShutdownHook(clientFinalizer);
-      }
-      authorityToFs = new HashMap<String,FileSystem>();
-      CACHE.put(scheme, authorityToFs);
-    }
-
-    FileSystem fs = authorityToFs.get(authority);
-    if (fs == null) {
-      Class fsClass = conf.getClass("fs."+scheme+".impl", null);
-      if (fsClass == null) {
-        throw new IOException("No FileSystem for scheme: " + scheme);
-      }
-      fs = (FileSystem)ReflectionUtils.newInstance(fsClass, conf);
-      fs.initialize(uri, conf);
-      authorityToFs.put(authority, fs);
-    }
-
-    return fs;
+    return CACHE.get(uri, conf);
   }
 
   private static class ClientFinalizer extends Thread {
@@ -187,14 +165,8 @@
    * 
    * @throws IOException
    */
-  public static synchronized void closeAll() throws IOException {
-    Set<String> scheme = new HashSet<String>(CACHE.keySet());
-    for (String s : scheme) {
-      Set<String> authority = new HashSet<String>(CACHE.get(s).keySet());
-      for (String a : authority) {
-        CACHE.get(s).get(a).close();
-      }
-    }
+  public static void closeAll() throws IOException {
+    CACHE.closeAll();
   }
 
   /** Make sure that a path specifies a FileSystem. */
@@ -1110,22 +1082,7 @@
    * release any held locks.
    */
   public void close() throws IOException {
-    URI uri = getUri();
-    synchronized (FileSystem.class) {
-      Map<String,FileSystem> authorityToFs = CACHE.get(uri.getScheme());
-      if (authorityToFs != null) {
-        authorityToFs.remove(uri.getAuthority());
-        if (authorityToFs.isEmpty()) {
-          CACHE.remove(uri.getScheme());
-          if (CACHE.isEmpty() && !clientFinalizer.isAlive()) {
-            if (!Runtime.getRuntime().removeShutdownHook(clientFinalizer)) {
-              LOG.info("Could not cancel cleanup thread, though no " +
-                       "FileSystems are open");
-            }
-          }
-        }
-      }
-    }
+    CACHE.remove(new Cache.Key(getUri(), getConf()));
   }
 
   /** Return the total size of all files in the filesystem.*/
@@ -1211,5 +1168,119 @@
    */
   public void setOwner(Path p, String username, String groupname
       ) throws IOException {
+  }
+
+  private static FileSystem createFileSystem(URI uri, Configuration conf
+      ) throws IOException {
+    Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
+    if (clazz == null) {
+      throw new IOException("No FileSystem for scheme: " + uri.getScheme());
+    }
+    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
+    fs.initialize(uri, conf);
+    return fs;
+  }
+
+  /** Caching FileSystem objects */
+  private static class Cache {
+    final Map<Key, FsRef> map = new HashMap<Key, FsRef>();
+
+    synchronized FileSystem get(URI uri, Configuration conf) throws IOException{
+      Key key = new Key(uri, conf);
+      FsRef ref = map.get(key);
+      FileSystem fs = ref == null? null: ref.get();
+      if (fs == null) {
+        if (map.isEmpty() && !clientFinalizer.isAlive()) {
+          Runtime.getRuntime().addShutdownHook(clientFinalizer);
+        }
+
+        fs = createFileSystem(uri, conf);
+        map.put(key, new FsRef(fs, key));
+      }
+      return fs;
+    }
+
+    synchronized FsRef remove(Key key) {
+      FsRef ref = map.remove(key);
+      if (map.isEmpty() && !clientFinalizer.isAlive()) {
+        if (!Runtime.getRuntime().removeShutdownHook(clientFinalizer)) {
+          LOG.info("Could not cancel cleanup thread, though no " +
+                   "FileSystems are open");
+        }
+      }
+      return ref;
+    }
+
+    synchronized void closeAll() throws IOException {
+      List<IOException> exceptions = new ArrayList<IOException>();
+      for(FsRef ref : new ArrayList<FsRef>(map.values())) {
+        FileSystem fs = ref.get();
+        if (fs != null) {
+          try {
+            fs.close();
+          }
+          catch(IOException ioe) {
+            exceptions.add(ioe);
+          }
+        }
+        else {
+          remove(ref.key);
+        }        
+      }
+
+      if (!exceptions.isEmpty()) {
+        throw MultipleIOException.createIOException(exceptions);
+      }
+    }
+
+    /** Reference of FileSystem which contains the cache key */
+    private static class FsRef {
+      final FileSystem fs;
+      final Key key;
+      
+      FsRef(FileSystem fs, Key key) {
+        this.fs = fs;
+        this.key = key;
+      }
+
+      FileSystem get() {return fs;}
+    }
+
+    /** FileSystem.Cache.Key */
+    private static class Key {
+      final String scheme;
+      final String authority;
+      final String username;
+
+      Key(URI uri, Configuration conf) throws IOException {
+        scheme = uri.getScheme();
+        authority = uri.getAuthority();
+        UserGroupInformation ugi = UserGroupInformation.readFrom(conf);
+        username = ugi == null? null: ugi.getUserName();
+      }
+
+      /** {@inheritDoc} */
+      public int hashCode() {
+        return (scheme + authority + username).hashCode();
+      }
+
+      static boolean isEqual(Object a, Object b) {
+        return a == b || (a != null && a.equals(b));        
+      }
+
+      /** {@inheritDoc} */
+      public boolean equals(Object obj) {
+        if (obj == this) {
+          return true;
+        }
+        if (obj != null && obj instanceof Key) {
+          Key that = (Key)obj;
+          return isEqual(this.scheme, that.scheme)
+                 && isEqual(this.authority, that.authority)
+                 && isEqual(this.username, that.username);
+        }
+        return false;        
+      }
+    }
   }
 }

Added: hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/io/MultipleIOException.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/io/MultipleIOException.java?rev=635061&view=auto
==============================================================================
--- hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/io/MultipleIOException.java (added)
+++ hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/io/MultipleIOException.java Sat Mar  8 11:56:35 2008
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Encapsulate a list of {@link IOException} into an {@link IOException} */
+public class MultipleIOException extends IOException {
+  /** Require by {@link java.io.Serializable} */
+  private static final long serialVersionUID = 1L;
+  
+  private final List<IOException> exceptions;
+  
+  /** Constructor is private, use {@link #createIOException(List)}. */
+  private MultipleIOException(List<IOException> exceptions) {
+    super(exceptions.size() + " exceptions " + exceptions);
+    this.exceptions = exceptions;
+  }
+
+  /** @return the underlying exceptions */
+  public List<IOException> getExceptions() {return exceptions;}
+
+  /** A convenient method to create an {@link IOException}. */
+  public static IOException createIOException(List<IOException> exceptions) {
+    if (exceptions == null || exceptions.isEmpty()) {
+      return null;
+    }
+    if (exceptions.size() == 1) {
+      return exceptions.get(0);
+    }
+    return new MultipleIOException(exceptions);
+  }
+}
\ No newline at end of file

Propchange: hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/io/MultipleIOException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/io/MultipleIOException.java
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Modified: hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=635061&r1=635060&r2=635061&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/mapred/TaskTracker.java Sat Mar  8 11:56:35 2008
@@ -147,7 +147,6 @@
   //for serving map output to the other nodes
 
   static Random r = new Random();
-  FileSystem fs = null;
   private static final String SUBDIR = "taskTracker";
   private static final String CACHEDIR = "archive";
   private static final String JOBCACHE = "jobcache";
@@ -658,6 +657,7 @@
             throw new IOException("Not able to create job directory "
                                   + jobDir.toString());
         }
+        FileSystem fs =FileSystem.getNamed(jobClient.getFilesystemName(),fConf);
         fs.copyToLocalFile(new Path(jobFile), localJobFile);
         JobConf localJobConf = new JobConf(localJobFile);
         
@@ -846,12 +846,6 @@
     return jobClient;
   }
         
-  /**Return the DFS filesystem
-   */
-  public FileSystem getFileSystem(){
-    return fs;
-  }
-  
   /** Return the port at which the tasktracker bound to */
   public synchronized InetSocketAddress getTaskTrackerReportAddress() {
     return taskReportAddress;
@@ -891,7 +885,6 @@
    */
   State offerService() throws Exception {
     long lastHeartbeat = 0;
-    this.fs = FileSystem.getNamed(jobClient.getFilesystemName(), this.fConf);
 
     while (running && !shuttingDown) {
       try {

Modified: hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/security/UserGroupInformation.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/security/UserGroupInformation.java?rev=635061&r1=635060&r2=635061&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/security/UserGroupInformation.java (original)
+++ hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/security/UserGroupInformation.java Sat Mar  8 11:56:35 2008
@@ -17,8 +17,13 @@
  */
 package org.apache.hadoop.security;
 
+import java.io.IOException;
+
+import javax.security.auth.login.LoginException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 
 /** A {@link Writable} abstract class for storing user and groups information.
@@ -53,4 +58,15 @@
    * @return an array of group names
    */
   public abstract String[] getGroupNames();
+
+  /** Read a {@link UserGroupInformation} from conf */
+  public static UserGroupInformation readFrom(Configuration conf
+      ) throws IOException {
+    try {
+      return UnixUserGroupInformation.readFromConf(conf,
+        UnixUserGroupInformation.UGI_PROPERTY_NAME);
+    } catch (LoginException e) {
+      throw (IOException)new IOException().initCause(e);
+    }
+  }
 }

Modified: hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/fs/TestFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/fs/TestFileSystem.java?rev=635061&r1=635060&r2=635061&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/fs/TestFileSystem.java (original)
+++ hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/fs/TestFileSystem.java Sat Mar  8 11:56:35 2008
@@ -35,7 +35,6 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.mapred.InputFormatBase;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
@@ -43,9 +42,11 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.lib.LongSumReducer;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
 
 public class TestFileSystem extends TestCase {
-  private static final Log LOG = InputFormatBase.LOG;
+  private static final Log LOG = FileSystem.LOG;
 
   private static Configuration conf = new Configuration();
   private static int BUFFER_SIZE = conf.getInt("io.file.buffer.size", 4096);
@@ -463,4 +464,17 @@
     }
   }
 
+  static Configuration createConf4Testing(String username) throws Exception {
+    Configuration conf = new Configuration();
+    UnixUserGroupInformation.saveToConf(conf,
+        UnixUserGroupInformation.UGI_PROPERTY_NAME,
+        new UnixUserGroupInformation(username, new String[]{"group"}));
+    return conf;    
+  }
+
+  public void testFsCache() throws Exception {
+    Configuration c1 = createConf4Testing("foo");
+    Configuration c2 = createConf4Testing("bar");
+    assertFalse(FileSystem.get(c1) == FileSystem.get(c2));
+  }
 }

Modified: hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java?rev=635061&r1=635060&r2=635061&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java (original)
+++ hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java Sat Mar  8 11:56:35 2008
@@ -22,6 +22,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.Properties;
 
@@ -43,7 +44,6 @@
 public abstract class ClusterMapReduceTestCase extends TestCase {
   private MiniDFSCluster dfsCluster = null;
   private MiniMRCluster mrCluster = null;
-  private FileSystem fileSystem = null;
 
   /**
    * Creates Hadoop Cluster and DFS before a test case is run.
@@ -79,11 +79,10 @@
         }
       }
       dfsCluster = new MiniDFSCluster(conf, 2, reformatDFS, null);
-      fileSystem = dfsCluster.getFileSystem();
 
       ConfigurableMiniMRCluster.setConfiguration(props);
       //noinspection deprecation
-      mrCluster = new ConfigurableMiniMRCluster(2, fileSystem.getName(), 1);
+      mrCluster = new ConfigurableMiniMRCluster(2, getFileSystem().getName(), 1);
     }
   }
 
@@ -129,7 +128,6 @@
     if (dfsCluster != null) {
       dfsCluster.shutdown();
       dfsCluster = null;
-      fileSystem = null;
     }
   }
 
@@ -150,9 +148,10 @@
    * TestCases should use this Filesystem instance.
    *
    * @return the filesystem used by Hadoop.
+   * @throws IOException 
    */
-  protected FileSystem getFileSystem() {
-    return fileSystem;
+  protected FileSystem getFileSystem() throws IOException {
+    return dfsCluster.getFileSystem();
   }
 
   /**

Modified: hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=635061&r1=635060&r2=635061&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Sat Mar  8 11:56:35 2008
@@ -22,6 +22,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UnixUserGroupInformation;
 
 /**
  * This class creates a single-process Map-Reduce cluster for junit testing.
@@ -42,6 +43,7 @@
   private List<Thread> taskTrackerThreadList = new ArrayList<Thread>();
     
   private String namenode;
+  private UnixUserGroupInformation ugi = null;
     
   /**
    * An inner class that runs a job tracker.
@@ -97,7 +99,6 @@
   class TaskTrackerRunner implements Runnable {
     volatile TaskTracker tt;
     int trackerId;
-    JobConf conf = createJobConf();
     // the localDirs for this taskTracker
     String[] localDirs;
     volatile boolean isInitialized = false;
@@ -108,7 +109,7 @@
       this.trackerId = trackerId;
       this.numDir = numDir;
       localDirs = new String[numDir];
-      conf = createJobConf();
+      JobConf conf = createJobConf();
       conf.set("mapred.task.tracker.http.address", "0.0.0.0:0");
       conf.set("mapred.task.tracker.report.address", 
                 "127.0.0.1:" + taskTrackerPort);
@@ -132,6 +133,14 @@
       }
       conf.set("mapred.local.dir", localPath.toString());
       LOG.info("mapred.local.dir is " +  localPath);
+      try {
+        tt = new TaskTracker(conf);
+        isInitialized = true;
+      } catch (Throwable e) {
+        isDead = true;
+        tt = null;
+        LOG.error("task tracker " + trackerId + " crashed", e);
+      }
     }
         
     /**
@@ -139,9 +148,9 @@
      */
     public void run() {
       try {
-        tt = new TaskTracker(conf);
-        isInitialized = true;
-        tt.run();
+        if (tt != null) {
+          tt.run();
+        }
       } catch (Throwable e) {
         isDead = true;
         tt = null;
@@ -227,6 +236,11 @@
     result.set("mapred.job.tracker", "localhost:"+jobTrackerPort);
     result.set("mapred.job.tracker.http.address", 
                         "0.0.0.0:" + jobTrackerInfoPort);
+    if (ugi != null) {
+      result.set("mapred.system.dir", "/mapred/system");
+      UnixUserGroupInformation.saveToConf(result,
+          UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
+    }
     // for debugging have all task output sent to the test output
     JobClient.setTaskOutputFilter(result, JobClient.TaskStatusFilter.ALL);
     return result;
@@ -264,12 +278,19 @@
                        int numTaskTrackers,
                        String namenode,
                        boolean taskTrackerFirst, int numDir) throws IOException {
+    this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode,
+        taskTrackerFirst, numDir, null);
+  }
 
+  public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
+      int numTaskTrackers, String namenode, boolean taskTrackerFirst,
+      int numDir, UnixUserGroupInformation ugi) throws IOException {
     this.jobTrackerPort = jobTrackerPort;
     this.taskTrackerPort = taskTrackerPort;
     this.jobTrackerInfoPort = 0;
     this.numTaskTrackers = numTaskTrackers;
     this.namenode = namenode;
+    this.ugi = ugi;
 
     // Create the JobTracker
     jobTracker = new JobTrackerRunner();

Modified: hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/PiEstimator.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/PiEstimator.java?rev=635061&r1=635060&r2=635061&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/PiEstimator.java (original)
+++ hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/PiEstimator.java Sat Mar  8 11:56:35 2008
@@ -175,15 +175,15 @@
     try {
       JobClient.runJob(jobConf);
       Path inFile = new Path(outDir, "reduce-out");
-      SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile,
-                                                           jobConf);
+      SequenceFile.Reader reader = new SequenceFile.Reader(
+          FileSystem.get(jobConf), inFile, jobConf);
       IntWritable numInside = new IntWritable();
       IntWritable numOutside = new IntWritable();
       reader.next(numInside, numOutside);
       reader.close();
       estimate = (double) (numInside.get()*4.0)/(numMaps*numPoints);
     } finally {
-      fileSys.delete(tmpDir);
+      FileSystem.get(jobConf).delete(tmpDir);
     }
     
     return estimate;

Modified: hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java?rev=635061&r1=635060&r2=635061&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java (original)
+++ hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java Sat Mar  8 11:56:35 2008
@@ -172,7 +172,6 @@
                    "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result);
           
     } finally {
-      if (fileSys != null) { fileSys.close(); }
       if (dfs != null) { dfs.shutdown(); }
       if (mr != null) { mr.shutdown();
       }
@@ -207,7 +206,6 @@
       
     } 
     finally {
-      if (fileSys != null) { fileSys.close(); }
       if (dfs != null) { dfs.shutdown(); }
       if (mr != null) { mr.shutdown();
       }

Modified: hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java?rev=635061&r1=635060&r2=635061&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java (original)
+++ hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java Sat Mar  8 11:56:35 2008
@@ -94,7 +94,6 @@
       // Run sort-validator to check if sort worked correctly
       runSortValidator(mr.createJobConf(), SORT_INPUT_PATH, SORT_OUTPUT_PATH);
     } finally {
-      if (fileSys != null) { fileSys.close(); }
       if (dfs != null) { dfs.shutdown(); }
       if (mr != null) { mr.shutdown();
       }

Modified: hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=635061&r1=635060&r2=635061&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Sat Mar  8 11:56:35 2008
@@ -112,7 +112,7 @@
    * @param mr the map-reduce cluster
    * @param taskDirs the task ids that should be present
    */
-  private static void checkTaskDirectories(MiniMRCluster mr,
+  static void checkTaskDirectories(MiniMRCluster mr,
                                            String[] jobIds,
                                            String[] taskDirs) {
     mr.waitUntilIdle();
@@ -155,7 +155,55 @@
       assertTrue("Directory " + taskDirs[i] + " not found", found[i]);
     }
   }
-  
+
+  static void runPI(MiniMRCluster mr, JobConf jobconf) throws IOException {
+    LOG.info("runPI");
+    double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, jobconf);
+    double error = Math.abs(Math.PI - estimate);
+    assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
+    checkTaskDirectories(mr, new String[]{}, new String[]{});
+  }
+
+  static void runWordCount(MiniMRCluster mr, JobConf jobConf) throws IOException {
+    LOG.info("runWordCount");
+    // Run a word count example
+    // Keeping tasks that match this pattern
+    jobConf.setKeepTaskFilesPattern("task_[^_]*_[0-9]*_m_000001_.*");
+    TestResult result;
+    final Path inDir = new Path("./wc/input");
+    final Path outDir = new Path("./wc/output");
+    result = launchWordCount(jobConf, inDir, outDir,
+                             "The quick brown fox\nhas many silly\n" + 
+                             "red fox sox\n",
+                             3, 1);
+    assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
+                 "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result.output);
+    String jobid = result.job.getJobID();
+    String taskid = "task_" + jobid.substring(4) + "_m_000001_0";
+    checkTaskDirectories(mr, new String[]{jobid}, new String[]{taskid});
+    // test with maps=0
+    jobConf = mr.createJobConf();
+    result = launchWordCount(jobConf, inDir, outDir, "owen is oom", 0, 1);
+    assertEquals("is\t1\noom\t1\nowen\t1\n", result.output);
+    // Run a job with input and output going to localfs even though the 
+    // default fs is hdfs.
+    {
+      FileSystem localfs = FileSystem.getLocal(jobConf);
+      String TEST_ROOT_DIR =
+        new File(System.getProperty("test.build.data","/tmp"))
+        .toString().replace(' ', '+');
+      Path localIn = localfs.makeQualified
+                        (new Path(TEST_ROOT_DIR + "/local/in"));
+      Path localOut = localfs.makeQualified
+                        (new Path(TEST_ROOT_DIR + "/local/out"));
+      result = launchWordCount(jobConf, localIn, localOut,
+                               "all your base belong to us", 1, 1);
+      assertEquals("all\t1\nbase\t1\nbelong\t1\nto\t1\nus\t1\nyour\t1\n", 
+                   result.output);
+      assertTrue("outputs on localfs", localfs.exists(localOut));
+    }
+  }
+
   public void testWithDFS() throws IOException {
     MiniDFSCluster dfs = null;
     MiniMRCluster mr = null;
@@ -166,52 +214,11 @@
       Configuration conf = new Configuration();
       dfs = new MiniDFSCluster(conf, 4, true, null);
       fileSys = dfs.getFileSystem();
-      mr = new MiniMRCluster(taskTrackers, fileSys.getName(), 1);
-      double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, 
-                                           mr.createJobConf());
-      double error = Math.abs(Math.PI - estimate);
-      assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
-      checkTaskDirectories(mr, new String[]{}, new String[]{});
-          
-      // Run a word count example
-      JobConf jobConf = mr.createJobConf();
-      // Keeping tasks that match this pattern
-      jobConf.setKeepTaskFilesPattern("task_[^_]*_[0-9]*_m_000001_.*");
-      TestResult result;
-      final Path inDir = new Path("/testing/wc/input");
-      final Path outDir = new Path("/testing/wc/output");
-      result = launchWordCount(jobConf, inDir, outDir,
-                               "The quick brown fox\nhas many silly\n" + 
-                               "red fox sox\n",
-                               3, 1);
-      assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
-                   "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result.output);
-      String jobid = result.job.getJobID();
-      String taskid = "task_" + jobid.substring(4) + "_m_000001_0";
-      checkTaskDirectories(mr, new String[]{jobid}, new String[]{taskid});
-      // test with maps=0
-      jobConf = mr.createJobConf();
-      result = launchWordCount(jobConf, inDir, outDir, "owen is oom", 0, 1);
-      assertEquals("is\t1\noom\t1\nowen\t1\n", result.output);
-      // Run a job with input and output going to localfs even though the 
-      // default fs is hdfs.
-      {
-        FileSystem localfs = FileSystem.getLocal(jobConf);
-        String TEST_ROOT_DIR =
-          new File(System.getProperty("test.build.data","/tmp"))
-          .toString().replace(' ', '+');
-        Path localIn = localfs.makeQualified
-                          (new Path(TEST_ROOT_DIR + "/local/in"));
-        Path localOut = localfs.makeQualified
-                          (new Path(TEST_ROOT_DIR + "/local/out"));
-        result = launchWordCount(jobConf, localIn, localOut,
-                                 "all your base belong to us", 1, 1);
-        assertEquals("all\t1\nbase\t1\nbelong\t1\nto\t1\nus\t1\nyour\t1\n", 
-                     result.output);
-        assertTrue("outputs on localfs", localfs.exists(localOut));
-      }
+      mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1);
+
+      runPI(mr, mr.createJobConf());
+      runWordCount(mr, mr.createJobConf());
     } finally {
-      if (fileSys != null) { fileSys.close(); }
       if (dfs != null) { dfs.shutdown(); }
       if (mr != null) { mr.shutdown();
       }

Added: hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java?rev=635061&view=auto
==============================================================================
--- hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java (added)
+++ hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java Sat Mar  8 11:56:35 2008
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.*;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.*;
+
+/**
+ * A JUnit test to test Mini Map-Reduce Cluster with Mini-DFS.
+ */
+public class TestMiniMRWithDFSWithDistinctUsers extends TestCase {
+  static final long now = System.currentTimeMillis();
+  static final UnixUserGroupInformation DFS_UGI = createUGI("dfs", true); 
+  static final UnixUserGroupInformation PI_UGI = createUGI("pi", false); 
+  static final UnixUserGroupInformation WC_UGI = createUGI("wc", false); 
+
+  static UnixUserGroupInformation createUGI(String name, boolean issuper) {
+    String username = name + now;
+    String group = issuper? "supergroup": username;
+    return UnixUserGroupInformation.createImmutable(
+        new String[]{username, group});
+  }
+  
+  static JobConf createJobConf(MiniMRCluster mr, UnixUserGroupInformation ugi) {
+    JobConf jobconf = mr.createJobConf();
+    UnixUserGroupInformation.saveToConf(jobconf,
+        UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
+    return jobconf;
+  }
+
+  static void mkdir(FileSystem fs, String dir) throws IOException {
+    Path p = new Path(dir);
+    fs.mkdirs(p);
+    fs.setPermission(p, new FsPermission((short)0777));
+  }
+
+  public void testDistinctUsers() throws Exception {
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    try {
+      Configuration conf = new Configuration();
+      UnixUserGroupInformation.saveToConf(conf,
+          UnixUserGroupInformation.UGI_PROPERTY_NAME, DFS_UGI);
+      dfs = new MiniDFSCluster(conf, 4, true, null);
+      FileSystem fs = dfs.getFileSystem();
+      mkdir(fs, "/user");
+      mkdir(fs, "/mapred");
+
+      UnixUserGroupInformation MR_UGI = createUGI(
+          UnixUserGroupInformation.login().getUserName(), true); 
+      mr = new MiniMRCluster(0, 0, 4, dfs.getFileSystem().getUri().toString(),
+          false, 1, MR_UGI);
+
+      JobConf pi = createJobConf(mr, PI_UGI);
+      TestMiniMRWithDFS.runPI(mr, pi);
+
+      JobConf wc = createJobConf(mr, WC_UGI);
+      TestMiniMRWithDFS.runWordCount(mr, wc);
+    } finally {
+      if (dfs != null) { dfs.shutdown(); }
+      if (mr != null) { mr.shutdown();}
+    }
+  }
+}
\ No newline at end of file

Propchange: hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Modified: hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java?rev=635061&r1=635060&r2=635061&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java (original)
+++ hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java Sat Mar  8 11:56:35 2008
@@ -117,7 +117,6 @@
       assertTrue(result);
           
     } finally {
-      if (fileSys != null) { fileSys.close(); }
       if (dfs != null) { dfs.shutdown(); }
       if (mr != null) { mr.shutdown(); }
     }

Modified: hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java?rev=635061&r1=635060&r2=635061&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java (original)
+++ hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java Sat Mar  8 11:56:35 2008
@@ -42,6 +42,11 @@
   private static final Log LOG =
     LogFactory.getLog(TestPipes.class.getName());
 
+  static void cleanup(FileSystem fs, Path p) throws IOException {
+    FileUtil.fullyDelete(fs, p);
+    assertFalse("output not cleaned up", fs.exists(p));
+  }
+
   public void testPipes() throws IOException {
     if (System.getProperty("compile.c++") == null) {
       LOG.info("compile.c++ is not defined, so skipping TestPipes");
@@ -49,7 +54,6 @@
     }
     MiniDFSCluster dfs = null;
     MiniMRCluster mr = null;
-    FileSystem fs = null;
     Path cppExamples = new Path(System.getProperty("install.c++.examples"));
     Path inputPath = new Path("/testing/in");
     Path outputPath = new Path("/testing/out");
@@ -57,20 +61,19 @@
       final int numSlaves = 2;
       Configuration conf = new Configuration();
       dfs = new MiniDFSCluster(conf, numSlaves, true, null);
-      fs = dfs.getFileSystem();
-      mr = new MiniMRCluster(numSlaves, fs.getName(), 1);
-      writeInputFile(fs, inputPath);
-      runProgram(mr, fs, new Path(cppExamples, "bin/wordcount-simple"), 
+      mr = new MiniMRCluster(numSlaves, dfs.getFileSystem().getName(), 1);
+      writeInputFile(dfs.getFileSystem(), inputPath);
+      runProgram(mr, dfs, new Path(cppExamples, "bin/wordcount-simple"), 
                  inputPath, outputPath, 3, 2, twoSplitOutput);
-      FileUtil.fullyDelete(fs, outputPath);
-      assertFalse("output not cleaned up", fs.exists(outputPath));
-      runProgram(mr, fs, new Path(cppExamples, "bin/wordcount-simple"), 
+      cleanup(dfs.getFileSystem(), outputPath);
+
+      runProgram(mr, dfs, new Path(cppExamples, "bin/wordcount-simple"), 
                  inputPath, outputPath, 3, 0, noSortOutput);
-      FileUtil.fullyDelete(fs, outputPath);
-      assertFalse("output not cleaned up", fs.exists(outputPath));
-      runProgram(mr, fs, new Path(cppExamples, "bin/wordcount-part"),
+      cleanup(dfs.getFileSystem(), outputPath);
+
+      runProgram(mr, dfs, new Path(cppExamples, "bin/wordcount-part"),
                  inputPath, outputPath, 3, 2, fixedPartitionOutput);
-      runNonPipedProgram(mr, fs, new Path(cppExamples, "bin/wordcount-nopipe"));
+      runNonPipedProgram(mr, dfs, new Path(cppExamples,"bin/wordcount-nopipe"));
       mr.waitUntilIdle();
     } finally {
       mr.shutdown();
@@ -126,25 +129,28 @@
     out.close();
   }
 
-  private void runProgram(MiniMRCluster mr, FileSystem fs, 
+  private void runProgram(MiniMRCluster mr, MiniDFSCluster dfs, 
                           Path program, Path inputPath, Path outputPath,
                           int numMaps, int numReduces, String[] expectedResults
                          ) throws IOException {
     Path wordExec = new Path("/testing/bin/application");
-    FileUtil.fullyDelete(fs, wordExec.getParent());
-    fs.copyFromLocalFile(program, wordExec);                                         
     JobConf job = mr.createJobConf();
     job.setNumMapTasks(numMaps);
     job.setNumReduceTasks(numReduces);
-    Submitter.setExecutable(job, fs.makeQualified(wordExec).toString());
-    Submitter.setIsJavaRecordReader(job, true);
-    Submitter.setIsJavaRecordWriter(job, true);
-    job.setInputPath(inputPath);
-    job.setOutputPath(outputPath);
-    RunningJob result = Submitter.submitJob(job);
-    assertTrue("pipes job failed", result.isSuccessful());
+    {
+      FileSystem fs = dfs.getFileSystem();
+      FileUtil.fullyDelete(fs, wordExec.getParent());
+      fs.copyFromLocalFile(program, wordExec);                                         
+      Submitter.setExecutable(job, fs.makeQualified(wordExec).toString());
+      Submitter.setIsJavaRecordReader(job, true);
+      Submitter.setIsJavaRecordWriter(job, true);
+      job.setInputPath(inputPath);
+      job.setOutputPath(outputPath);
+      RunningJob result = Submitter.submitJob(job);
+      assertTrue("pipes job failed", result.isSuccessful());
+    }
     List<String> results = new ArrayList<String>();
-    for (Path p:fs.listPaths(outputPath)) {
+    for (Path p:dfs.getFileSystem().listPaths(outputPath)) {
       results.add(TestMiniMRWithDFS.readOutput(p, job));
     }
     assertEquals("number of reduces is wrong", 
@@ -163,7 +169,7 @@
    * @param program the program to run
    * @throws IOException
    */
-  private void runNonPipedProgram(MiniMRCluster mr, FileSystem dfs,
+  private void runNonPipedProgram(MiniMRCluster mr, MiniDFSCluster dfs,
                                   Path program) throws IOException {
     JobConf job = mr.createJobConf();
     job.setInputFormat(WordCountInputFormat.class);
@@ -174,8 +180,11 @@
     Path outDir = new Path(testDir, "output");
     Path wordExec = new Path("/testing/bin/application");
     Path jobXml = new Path(testDir, "job.xml");
-    FileUtil.fullyDelete(dfs, wordExec.getParent());
-    dfs.copyFromLocalFile(program, wordExec);
+    {
+      FileSystem fs = dfs.getFileSystem();
+      FileUtil.fullyDelete(fs, wordExec.getParent());
+      fs.copyFromLocalFile(program, wordExec);
+    }
     DataOutputStream out = local.create(new Path(inDir, "part0"));
     out.writeBytes("i am a silly test\n");
     out.writeBytes("you are silly\n");
@@ -193,7 +202,7 @@
                                   "-input", inDir.toString(),
                                   "-output", outDir.toString(),
                                   "-program", 
-                                  dfs.makeQualified(wordExec).toString(),
+                        dfs.getFileSystem().makeQualified(wordExec).toString(),
                                   "-reduces", "2"});
     } catch (Exception e) {
       assertTrue("got exception: " + StringUtils.stringifyException(e), false);



Mime
View raw message