hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r783055 [3/6] - in /hadoop/core/branches/HADOOP-3628-2: ./ .eclipse.templates/ ivy/ lib/ lib/jsp-2.1/ src/contrib/ src/contrib/capacity-scheduler/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-schedu...
Date Tue, 09 Jun 2009 16:11:23 GMT
Modified: hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java Tue Jun  9 16:11:19 2009
@@ -63,6 +63,17 @@
  * Unlike {@link org.apache.hadoop.fs.s3.S3FileSystem} this implementation
  * stores files on S3 in their
  * native form so they can be read by other S3 tools.
+ *
+ * A note about directories. S3 of course has no "native" support for them.
+ * The idiom we choose then is: for any directory created by this class,
+ * we use an empty object "#{dirpath}_$folder$" as a marker.
+ * Further, to interoperate with other S3 tools, we also accept the following:
+ *  - an object "#{dirpath}/' denoting a directory marker
+ *  - if there exists any objects with the prefix "#{dirpath}/", then the
+ *    directory is said to exist
+ *  - if both a file with the name of a directory and a marker for that
+ *    directory exists, then the *file masks the directory*, and the directory
+ *    is never returned.
  * </p>
  * @see org.apache.hadoop.fs.s3.S3FileSystem
  */
@@ -72,7 +83,6 @@
     LogFactory.getLog(NativeS3FileSystem.class);
   
   private static final String FOLDER_SUFFIX = "_$folder$";
-  private static final long MAX_S3_FILE_SIZE = 5 * 1024 * 1024 * 1024L;
   static final String PATH_DELIMITER = Path.SEPARATOR;
   private static final int S3_MAX_LISTING_LENGTH = 1000;
   
@@ -87,6 +97,7 @@
       this.key = key;
     }
     
+    @Override
     public synchronized int read() throws IOException {
       int result = in.read();
       if (result != -1) {
@@ -97,6 +108,7 @@
       }
       return result;
     }
+    @Override
     public synchronized int read(byte[] b, int off, int len)
       throws IOException {
       
@@ -110,18 +122,23 @@
       return result;
     }
 
+    @Override
     public void close() throws IOException {
       in.close();
     }
 
+    @Override
     public synchronized void seek(long pos) throws IOException {
       in.close();
+      LOG.info("Opening key '" + key + "' for reading at position '" + pos + "'");
       in = store.retrieve(key, pos);
       this.pos = pos;
     }
+    @Override
     public synchronized long getPos() throws IOException {
       return pos;
     }
+    @Override
     public boolean seekToNewSource(long targetPos) throws IOException {
       return false;
     }
@@ -142,6 +159,7 @@
       this.conf = conf;
       this.key = key;
       this.backupFile = newBackupFile();
+      LOG.info("OutputStream for key '" + key + "' writing to tempfile '" + this.backupFile + "'");
       try {
         this.digest = MessageDigest.getInstance("MD5");
         this.backupStream = new BufferedOutputStream(new DigestOutputStream(
@@ -176,6 +194,7 @@
       }
 
       backupStream.close();
+      LOG.info("OutputStream for key '" + key + "' closed. Now beginning upload");
       
       try {
         byte[] md5Hash = digest == null ? null : digest.digest();
@@ -187,7 +206,7 @@
         super.close();
         closed = true;
       } 
-
+      LOG.info("OutputStream for key '" + key + "' upload complete");
     }
 
     @Override
@@ -199,8 +218,6 @@
     public void write(byte[] b, int off, int len) throws IOException {
       backupStream.write(b, off, len);
     }
-    
-    
   }
   
   private URI uri;
@@ -244,6 +261,7 @@
     Map<String, RetryPolicy> methodNameToPolicyMap =
       new HashMap<String, RetryPolicy>();
     methodNameToPolicyMap.put("storeFile", methodPolicy);
+    methodNameToPolicyMap.put("rename", methodPolicy);
     
     return (NativeFileSystemStore)
       RetryProxy.create(NativeFileSystemStore.class, store,
@@ -251,10 +269,19 @@
   }
   
   private static String pathToKey(Path path) {
+    if (path.toUri().getScheme() != null && "".equals(path.toUri().getPath())) {
+      // allow uris without trailing slash after bucket to refer to root,
+      // like s3n://mybucket
+      return "";
+    }
     if (!path.isAbsolute()) {
       throw new IllegalArgumentException("Path must be absolute: " + path);
     }
-    return path.toUri().getPath().substring(1); // remove initial slash
+    String ret = path.toUri().getPath().substring(1); // remove initial slash
+    if (ret.endsWith("/") && (ret.indexOf("/") != ret.length() - 1)) {
+      ret = ret.substring(0, ret.length() -1);
+  }
+    return ret;
   }
   
   private static Path keyToPath(String key) {
@@ -269,6 +296,7 @@
   }
 
   /** This optional operation is not yet supported. */
+  @Override
   public FSDataOutputStream append(Path f, int bufferSize,
       Progressable progress) throws IOException {
     throw new IOException("Not supported");
@@ -286,39 +314,53 @@
         throw new IOException("File already exists: "+f);
       }
     } else {
-       if (flag.contains(CreateFlag.APPEND) && !flag.contains(CreateFlag.CREATE))
-         throw new IOException("File already exists: " + f.toString());
+      if (flag.contains(CreateFlag.APPEND) && !flag.contains(CreateFlag.CREATE))
+        throw new IOException("File already exists: " + f.toString());
     }
     
+    LOG.debug("Creating new file '" + f + "' in S3");
     Path absolutePath = makeAbsolute(f);
     String key = pathToKey(absolutePath);
     return new FSDataOutputStream(new NativeS3FsOutputStream(getConf(), store,
         key, progress, bufferSize), statistics);
-    
   }
   
   @Override
-  public boolean delete(Path f, boolean recursive) throws IOException {
+  public boolean delete(Path f, boolean recurse) throws IOException {
     FileStatus status;
     try {
       status = getFileStatus(f);
     } catch (FileNotFoundException e) {
+      LOG.debug("Delete called for '" + f + "' but file does not exist, so returning false");
       return false;
     }
     Path absolutePath = makeAbsolute(f);
     String key = pathToKey(absolutePath);
     if (status.isDir()) {
-      FileStatus[] contents = listStatus(f);
-      if (!recursive && contents.length > 0) {
-        throw new IOException("Directory " + f.toString() + " is not empty.");
+      if (!recurse && listStatus(f).length > 0) {
+        throw new IOException("Can not delete " + f + " at is a not empty directory and recurse option is false");
       }
-      for (FileStatus p : contents) {
-        if (!delete(p.getPath(), recursive)) {
-          return false;
+
+      createParent(f);
+
+      LOG.debug("Deleting directory '" + f  + "'");
+      String priorLastKey = null;
+      do {
+        PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, true);
+        for (FileMetadata file : listing.getFiles()) {
+          store.delete(file.getKey());
         }
+        priorLastKey = listing.getPriorLastKey();
+      } while (priorLastKey != null);
+
+      try {
+        store.delete(key + FOLDER_SUFFIX);
+      } catch (FileNotFoundException e) {
+        //this is fine, we don't require a marker
       }
-      store.delete(key + FOLDER_SUFFIX);
     } else {
+      LOG.debug("Deleting file '" + f + "'");
+      createParent(f);
       store.delete(key);
     }
     return true;
@@ -326,7 +368,6 @@
 
   @Override
   public FileStatus getFileStatus(Path f) throws IOException {
-    
     Path absolutePath = makeAbsolute(f);
     String key = pathToKey(absolutePath);
     
@@ -334,23 +375,28 @@
       return newDirectory(absolutePath);
     }
     
+    LOG.debug("getFileStatus retrieving metadata for key '" + key + "'");
     FileMetadata meta = store.retrieveMetadata(key);
     if (meta != null) {
+      LOG.debug("getFileStatus returning 'file' for key '" + key + "'");
       return newFile(meta, absolutePath);
     }
     if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) {
+      LOG.debug("getFileStatus returning 'directory' for key '" + key + "' as '"
+          + key + FOLDER_SUFFIX + "' exists");
       return newDirectory(absolutePath);
     }
     
+    LOG.debug("getFileStatus listing key '" + key + "'");
     PartialListing listing = store.list(key, 1);
     if (listing.getFiles().length > 0 ||
         listing.getCommonPrefixes().length > 0) {
+      LOG.debug("getFileStatus returning 'directory' for key '" + key + "' as it has contents");
       return newDirectory(absolutePath);
     }
     
-    throw new FileNotFoundException(absolutePath +
-        ": No such file or directory.");
-    
+    LOG.debug("getFileStatus could not find key '" + key + "'");
+    throw new FileNotFoundException("No such file or directory '" + absolutePath + "'");
   }
 
   @Override
@@ -383,16 +429,20 @@
     Set<FileStatus> status = new TreeSet<FileStatus>();
     String priorLastKey = null;
     do {
-      PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, 
-          priorLastKey);
+      PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, false);
       for (FileMetadata fileMetadata : listing.getFiles()) {
         Path subpath = keyToPath(fileMetadata.getKey());
         String relativePath = pathUri.relativize(subpath.toUri()).getPath();
-        if (relativePath.endsWith(FOLDER_SUFFIX)) {
-          status.add(newDirectory(new Path(absolutePath,
-              relativePath.substring(0,
-                  relativePath.indexOf(FOLDER_SUFFIX)))));
-        } else {
+
+        if (fileMetadata.getKey().equals(key + "/")) {
+          // this is just the directory we have been asked to list
+        }
+        else if (relativePath.endsWith(FOLDER_SUFFIX)) {
+          status.add(newDirectory(new Path(
+              absolutePath,
+              relativePath.substring(0, relativePath.indexOf(FOLDER_SUFFIX)))));
+        }
+        else {
           status.add(newFile(fileMetadata, subpath));
         }
       }
@@ -409,17 +459,16 @@
       return null;
     }
     
-    return status.toArray(new FileStatus[0]);
+    return status.toArray(new FileStatus[status.size()]);
   }
   
   private FileStatus newFile(FileMetadata meta, Path path) {
-    return new FileStatus(meta.getLength(), false, 1, MAX_S3_FILE_SIZE,
+    return new FileStatus(meta.getLength(), false, 1, getDefaultBlockSize(),
         meta.getLastModified(), path.makeQualified(this));
   }
   
   private FileStatus newDirectory(Path path) {
-    return new FileStatus(0, true, 1, MAX_S3_FILE_SIZE, 0,
-        path.makeQualified(this));
+    return new FileStatus(0, true, 1, 0, 0, path.makeQualified(this));
   }
 
   @Override
@@ -443,10 +492,11 @@
       FileStatus fileStatus = getFileStatus(f);
       if (!fileStatus.isDir()) {
         throw new IOException(String.format(
-            "Can't make directory for path %s since it is a file.", f));
+            "Can't make directory for path '%s' since it is a file.", f));
 
       }
     } catch (FileNotFoundException e) {
+      LOG.debug("Making dir '" + f + "' in S3");
       String key = pathToKey(f) + FOLDER_SUFFIX;
       store.storeEmptyFile(key);    
     }
@@ -455,9 +505,11 @@
 
   @Override
   public FSDataInputStream open(Path f, int bufferSize) throws IOException {
-    if (!exists(f)) {
-      throw new FileNotFoundException(f.toString());
+    FileStatus fs = getFileStatus(f); // will throw if the file doesn't exist
+    if (fs.isDir()) {
+      throw new IOException("'" + f + "' is a directory");
     }
+    LOG.info("Opening '" + f + "' for reading");
     Path absolutePath = makeAbsolute(f);
     String key = pathToKey(absolutePath);
     return new FSDataInputStream(new BufferedFSInputStream(
@@ -467,47 +519,16 @@
   // rename() and delete() use this method to ensure that the parent directory
   // of the source does not vanish.
   private void createParent(Path path) throws IOException {
-      Path parent = path.getParent();
-      if (parent != null) {
-          String key = pathToKey(makeAbsolute(parent));
-          if (key.length() > 0) {
-              store.storeEmptyFile(key + FOLDER_SUFFIX);
-          }
+    Path parent = path.getParent();
+    if (parent != null) {
+      String key = pathToKey(makeAbsolute(parent));
+      if (key.length() > 0) {
+          store.storeEmptyFile(key + FOLDER_SUFFIX);
       }
+    }
   }
   
-  private boolean existsAndIsFile(Path f) throws IOException {
-    
-    Path absolutePath = makeAbsolute(f);
-    String key = pathToKey(absolutePath);
-    
-    if (key.length() == 0) {
-        return false;
-    }
-    
-    FileMetadata meta = store.retrieveMetadata(key);
-    if (meta != null) {
-        // S3 object with given key exists, so this is a file
-        return true;
-    }
-    
-    if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) {
-        // Signifies empty directory
-        return false;
-    }
     
-    PartialListing listing = store.list(key, 1, null);
-    if (listing.getFiles().length > 0 ||
-        listing.getCommonPrefixes().length > 0) {
-        // Non-empty directory
-        return false;
-    }
-    
-    throw new FileNotFoundException(absolutePath +
-        ": No such file or directory");
-}
-
-
   @Override
   public boolean rename(Path src, Path dst) throws IOException {
 
@@ -518,60 +539,79 @@
       return false;
     }
 
+    final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - ";
+
     // Figure out the final destination
     String dstKey;
     try {
-      boolean dstIsFile = existsAndIsFile(dst);
+      boolean dstIsFile = !getFileStatus(dst).isDir();
       if (dstIsFile) {
-        // Attempting to overwrite a file using rename()
+        LOG.debug(debugPreamble + "returning false as dst is an already existing file");
         return false;
       } else {
-        // Move to within the existent directory
+        LOG.debug(debugPreamble + "using dst as output directory");
         dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName())));
       }
     } catch (FileNotFoundException e) {
-      // dst doesn't exist, so we can proceed
+      LOG.debug(debugPreamble + "using dst as output destination");
       dstKey = pathToKey(makeAbsolute(dst));
       try {
         if (!getFileStatus(dst.getParent()).isDir()) {
-          return false; // parent dst is a file
+          LOG.debug(debugPreamble + "returning false as dst parent exists and is a file");
+          return false;
         }
       } catch (FileNotFoundException ex) {
-        return false; // parent dst does not exist
+        LOG.debug(debugPreamble + "returning false as dst parent does not exist");
+        return false;
       }
     }
 
+    boolean srcIsFile;
     try {
-      boolean srcIsFile = existsAndIsFile(src);
-      if (srcIsFile) {
-        store.rename(srcKey, dstKey);
-      } else {
-        // Move the folder object
-        store.delete(srcKey + FOLDER_SUFFIX);
-        store.storeEmptyFile(dstKey + FOLDER_SUFFIX);
+      srcIsFile = !getFileStatus(src).isDir();
+    } catch (FileNotFoundException e) {
+      LOG.debug(debugPreamble + "returning false as src does not exist");
+      return false;
+    }
+    if (srcIsFile) {
+      LOG.debug(debugPreamble + "src is file, so doing copy then delete in S3");
+      store.copy(srcKey, dstKey);
+      store.delete(srcKey);
+    } else {
+      LOG.debug(debugPreamble + "src is directory, so copying contents");
+      store.storeEmptyFile(dstKey + FOLDER_SUFFIX);
 
-        // Move everything inside the folder
-        String priorLastKey = null;
-        do {
-          PartialListing listing = store.listAll(srcKey, S3_MAX_LISTING_LENGTH,
-              priorLastKey);
-          for (FileMetadata file : listing.getFiles()) {
-            store.rename(file.getKey(), dstKey
-                + file.getKey().substring(srcKey.length()));
-          }
-          priorLastKey = listing.getPriorLastKey();
-        } while (priorLastKey != null);
-      }
+      List<String> keysToDelete = new ArrayList<String>();
+      String priorLastKey = null;
+      do {
+        PartialListing listing = store.list(srcKey, S3_MAX_LISTING_LENGTH, priorLastKey, true);
+        for (FileMetadata file : listing.getFiles()) {
+          keysToDelete.add(file.getKey());
+          store.copy(file.getKey(), dstKey + file.getKey().substring(srcKey.length()));
+        }
+        priorLastKey = listing.getPriorLastKey();
+      } while (priorLastKey != null);
 
-      createParent(src);
-      return true;
+      LOG.debug(debugPreamble + "all files in src copied, now removing src files");
+      for (String key: keysToDelete) {
+        store.delete(key);
+      }
 
-    } catch (FileNotFoundException e) {
-      // Source file does not exist;
-      return false;
+      try {
+        store.delete(srcKey + FOLDER_SUFFIX);
+      } catch (FileNotFoundException e) {
+        //this is fine, we don't require a marker
+      }
+      LOG.debug(debugPreamble + "done");
     }
-  }
 
+    return true;
+  }
+  
+  @Override
+  public long getDefaultBlockSize() {
+    return getConf().getLong("fs.s3n.block.size", 64 * 1024 * 1024);
+  }
 
   /**
    * Set the working directory to the given directory.
@@ -585,5 +625,4 @@
   public Path getWorkingDirectory() {
     return workingDir;
   }
-
 }

Modified: hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/io/IOUtils.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/io/IOUtils.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/io/IOUtils.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/io/IOUtils.java Tue Jun  9 16:11:19 2009
@@ -41,17 +41,8 @@
   public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) 
     throws IOException {
 
-    PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
-    byte buf[] = new byte[buffSize];
     try {
-      int bytesRead = in.read(buf);
-      while (bytesRead >= 0) {
-        out.write(buf, 0, bytesRead);
-        if ((ps != null) && ps.checkError()) {
-          throw new IOException("Unable to write to output stream.");
-        }
-        bytesRead = in.read(buf);
-      }
+      copyBytes(in, out, buffSize);
     } finally {
       if(close) {
         out.close();
@@ -61,6 +52,27 @@
   }
   
   /**
+   * Copies from one stream to another.
+   * 
+   * @param in InputStrem to read from
+   * @param out OutputStream to write to
+   * @param buffSize the size of the buffer 
+   */
+  public static void copyBytes(InputStream in, OutputStream out, int buffSize) 
+    throws IOException {
+
+    PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
+    byte buf[] = new byte[buffSize];
+    int bytesRead = in.read(buf);
+    while (bytesRead >= 0) {
+      out.write(buf, 0, bytesRead);
+      if ((ps != null) && ps.checkError()) {
+        throw new IOException("Unable to write to output stream.");
+      }
+      bytesRead = in.read(buf);
+    }
+  }
+  /**
    * Copies from one stream to another. <strong>closes the input and output streams 
    * at the end</strong>.
    * @param in InputStrem to read from

Modified: hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/metrics/ContextFactory.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/metrics/ContextFactory.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/metrics/ContextFactory.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/metrics/ContextFactory.java Tue Jun  9 16:11:19 2009
@@ -188,16 +188,19 @@
   private void setAttributes() throws IOException {
     InputStream is = getClass().getResourceAsStream(PROPERTIES_FILE);
     if (is != null) {
-      Properties properties = new Properties();
-      properties.load(is);
-      //for (Object propertyNameObj : properties.keySet()) {
-      Iterator it = properties.keySet().iterator();
-      while (it.hasNext()) {
-        String propertyName = (String) it.next();
-        String propertyValue = properties.getProperty(propertyName);
-        setAttribute(propertyName, propertyValue);
+      try {
+        Properties properties = new Properties();
+        properties.load(is);
+        //for (Object propertyNameObj : properties.keySet()) {
+        Iterator it = properties.keySet().iterator();
+        while (it.hasNext()) {
+          String propertyName = (String) it.next();
+          String propertyValue = properties.getProperty(propertyName);
+          setAttribute(propertyName, propertyValue);
+        }
+      } finally {
+        is.close();
       }
-      is.close();
     }
   }
     

Modified: hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/util/GenericOptionsParser.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/util/GenericOptionsParser.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/util/GenericOptionsParser.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/util/GenericOptionsParser.java Tue Jun  9 16:11:19 2009
@@ -205,8 +205,7 @@
     .withDescription("specify an application configuration file")
     .create("conf");
     Option property = OptionBuilder.withArgName("property=value")
-    .hasArgs()
-    .withArgPattern("=", 1)
+    .hasArg()
     .withDescription("use value for given property")
     .create('D');
     Option libjars = OptionBuilder.withArgName("paths")
@@ -281,9 +280,11 @@
     }
     if (line.hasOption('D')) {
       String[] property = line.getOptionValues('D');
-      for(int i=0; i<property.length-1; i=i+2) {
-        if (property[i]!=null)
-          conf.set(property[i], property[i+1]);
+      for(String prop : property) {
+        String[] keyval = prop.split("=");
+        if (keyval.length == 2) {
+          conf.set(keyval[0], keyval[1]);
+        }
       }
     }
     conf.setBoolean("mapred.used.genericoptionsparser", true);

Modified: hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java Tue Jun  9 16:11:19 2009
@@ -47,6 +47,10 @@
   private static final Pattern PROCFS_STAT_FILE_FORMAT = Pattern
       .compile("^([0-9-]+)\\s([^\\s]+)\\s[^\\s]\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+\\s){16}([0-9]+)(\\s[0-9-]+){16}");
 
+  // to enable testing, using this variable which can be configured
+  // to a test directory.
+  private String procfsDir;
+  
   private Integer pid = -1;
   private boolean setsidUsed = false;
   private long sleeptimeBeforeSigkill = DEFAULT_SLEEPTIME_BEFORE_SIGKILL;
@@ -59,11 +63,29 @@
 
   public ProcfsBasedProcessTree(String pid, boolean setsidUsed,
                                 long sigkillInterval) {
+    this(pid, setsidUsed, sigkillInterval, PROCFS);
+  }
+
+  /**
+   * Build a new process tree rooted at the pid.
+   * 
+   * This method is provided mainly for testing purposes, where
+   * the root of the proc file system can be adjusted.
+   * 
+   * @param pid root of the process tree
+   * @param setsidUsed true, if setsid was used for the root pid
+   * @param sigkillInterval how long to wait between a SIGTERM and SIGKILL 
+   *                        when killing a process tree
+   * @param procfsDir the root of a proc file system - only used for testing. 
+   */
+  public ProcfsBasedProcessTree(String pid, boolean setsidUsed,
+                                long sigkillInterval, String procfsDir) {
     this.pid = getValidPID(pid);
     this.setsidUsed = setsidUsed;
     sleeptimeBeforeSigkill = sigkillInterval;
+    this.procfsDir = procfsDir;
   }
-
+  
   /**
    * Sets SIGKILL interval
    * @deprecated Use {@link ProcfsBasedProcessTree#ProcfsBasedProcessTree(
@@ -108,13 +130,17 @@
       List<Integer> processList = getProcessList();
 
       Map<Integer, ProcessInfo> allProcessInfo = new HashMap<Integer, ProcessInfo>();
+      
+      // cache the processTree to get the age for processes
+      Map<Integer, ProcessInfo> oldProcs = 
+              new HashMap<Integer, ProcessInfo>(processTree);
       processTree.clear();
 
       ProcessInfo me = null;
       for (Integer proc : processList) {
         // Get information for each process
         ProcessInfo pInfo = new ProcessInfo(proc);
-        if (constructProcessInfo(pInfo) != null) {
+        if (constructProcessInfo(pInfo, procfsDir) != null) {
           allProcessInfo.put(proc, pInfo);
           if (proc.equals(this.pid)) {
             me = pInfo; // cache 'me'
@@ -150,6 +176,16 @@
         pInfoQueue.addAll(pInfo.getChildren());
       }
 
+      // update age values.
+      for (Map.Entry<Integer, ProcessInfo> procs : processTree.entrySet()) {
+        ProcessInfo oldInfo = oldProcs.get(procs.getKey());
+        if (oldInfo != null) {
+          if (procs.getValue() != null) {
+            procs.getValue().updateAge(oldInfo);  
+          }
+        }
+      }
+
       if (LOG.isDebugEnabled()) {
         // Log.debug the ProcfsBasedProcessTree
         LOG.debug(this.toString());
@@ -269,15 +305,29 @@
    * @return cumulative virtual memory used by the process-tree in bytes.
    */
   public long getCumulativeVmem() {
+    // include all processes.. all processes will be older than 0.
+    return getCumulativeVmem(0);
+  }
+
+  /**
+   * Get the cumulative virtual memory used by all the processes in the
+   * process-tree that are older than the passed in age.
+   * 
+   * @param olderThanAge processes above this age are included in the
+   *                      memory addition
+   * @return cumulative virtual memory used by the process-tree in bytes,
+   *          for processes older than this age.
+   */
+  public long getCumulativeVmem(int olderThanAge) {
     long total = 0;
     for (ProcessInfo p : processTree.values()) {
-      if (p != null) {
+      if ((p != null) && (p.getAge() > olderThanAge)) {
         total += p.getVmem();
       }
     }
     return total;
   }
-
+  
   private static Integer getValidPID(String pid) {
     Integer retPid = -1;
     try {
@@ -295,13 +345,13 @@
    * Get the list of all processes in the system.
    */
   private List<Integer> getProcessList() {
-    String[] processDirs = (new File(PROCFS)).list();
+    String[] processDirs = (new File(procfsDir)).list();
     List<Integer> processList = new ArrayList<Integer>();
 
     for (String dir : processDirs) {
       try {
         int pd = Integer.parseInt(dir);
-        if ((new File(PROCFS + dir)).isDirectory()) {
+        if ((new File(procfsDir, dir)).isDirectory()) {
           processList.add(Integer.valueOf(pd));
         }
       } catch (NumberFormatException n) {
@@ -319,12 +369,29 @@
    * same. Returns null on failing to read from procfs,
    */
   private static ProcessInfo constructProcessInfo(ProcessInfo pinfo) {
+    return constructProcessInfo(pinfo, PROCFS);
+  }
+
+  /**
+   * Construct the ProcessInfo using the process' PID and procfs rooted at the
+   * specified directory and return the same. It is provided mainly to assist
+   * testing purposes.
+   * 
+   * Returns null on failing to read from procfs,
+   *
+   * @param pinfo ProcessInfo that needs to be updated
+   * @param procfsDir root of the proc file system
+   * @return updated ProcessInfo, null on errors.
+   */
+  private static ProcessInfo constructProcessInfo(ProcessInfo pinfo, 
+                                                    String procfsDir) {
     ProcessInfo ret = null;
-    // Read "/proc/<pid>/stat" file
+    // Read "procfsDir/<pid>/stat" file - typically /proc/<pid>/stat
     BufferedReader in = null;
     FileReader fReader = null;
     try {
-      fReader = new FileReader(PROCFS + pinfo.getPid() + "/stat");
+      File pidDir = new File(procfsDir, String.valueOf(pinfo.getPid()));
+      fReader = new FileReader(new File(pidDir, "/stat"));
       in = new BufferedReader(fReader);
     } catch (FileNotFoundException f) {
       // The process vanished in the interim!
@@ -338,7 +405,7 @@
       boolean mat = m.find();
       if (mat) {
         // Set ( name ) ( ppid ) ( pgrpId ) (session ) (vsize )
-        pinfo.update(m.group(2), Integer.parseInt(m.group(3)), Integer
+        pinfo.updateProcessInfo(m.group(2), Integer.parseInt(m.group(3)), Integer
             .parseInt(m.group(4)), Integer.parseInt(m.group(5)), Long
             .parseLong(m.group(7)));
       }
@@ -365,7 +432,6 @@
 
     return ret;
   }
-
   /**
    * Returns a string printing PIDs of process present in the
    * ProcfsBasedProcessTree. Output format : [pid pid ..]
@@ -391,10 +457,14 @@
     private Integer ppid; // parent process-id
     private Integer sessionId; // session-id
     private Long vmem; // virtual memory usage
+    // how many times has this process been seen alive
+    private int age; 
     private List<ProcessInfo> children = new ArrayList<ProcessInfo>(); // list of children
 
     public ProcessInfo(int pid) {
       this.pid = Integer.valueOf(pid);
+      // seeing this the first time.
+      this.age = 1;
     }
 
     public Integer getPid() {
@@ -421,6 +491,10 @@
       return vmem;
     }
 
+    public int getAge() {
+      return age;
+    }
+    
     public boolean isParent(ProcessInfo p) {
       if (pid.equals(p.getPpid())) {
         return true;
@@ -428,7 +502,7 @@
       return false;
     }
 
-    public void update(String name, Integer ppid, Integer pgrpId,
+    public void updateProcessInfo(String name, Integer ppid, Integer pgrpId,
         Integer sessionId, Long vmem) {
       this.name = name;
       this.ppid = ppid;
@@ -437,6 +511,10 @@
       this.vmem = vmem;
     }
 
+    public void updateAge(ProcessInfo oldInfo) {
+      this.age = oldInfo.age + 1;
+    }
+    
     public boolean addChild(ProcessInfo p) {
       return children.add(p);
     }

Modified: hadoop/core/branches/HADOOP-3628-2/src/docs/src/documentation/content/xdocs/hdfs_design.xml
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/docs/src/documentation/content/xdocs/hdfs_design.xml?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/docs/src/documentation/content/xdocs/hdfs_design.xml (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/docs/src/documentation/content/xdocs/hdfs_design.xml Tue Jun  9 16:11:19 2009
@@ -271,7 +271,7 @@
       <title> Accessibility </title>
       <!-- XXX Make an API section ? (HTTP is "web service" API?) -->
       <p>
-      HDFS can be accessed from applications in many different ways. Natively, HDFS provides a <a href="http://hadoop.apache.org/core/docs/current/api/">Java API</a> for applications to use. A C language wrapper for this Java API is also available. In addition, an HTTP browser can also be used to browse the files of an HDFS instance. Work is in progress to expose HDFS through the <acronym title="Web-based Distributed Authoring and Versioning">WebDAV</acronym> protocol. 
+      HDFS can be accessed from applications in many different ways. Natively, HDFS provides a <a href="ext:api/org/apache/hadoop/fs/filesystem">FileSystem Java API</a> for applications to use. A C language wrapper for this Java API is also available. In addition, an HTTP browser can also be used to browse the files of an HDFS instance. Work is in progress to expose HDFS through the <acronym title="Web-based Distributed Authoring and Versioning">WebDAV</acronym> protocol. 
       </p>
 
       <section>
@@ -353,10 +353,7 @@
     <section>
       <title> References </title>
       <p>
-      HDFS Java API: 
-      <a href="http://hadoop.apache.org/core/docs/current/api/"> 
-        http://hadoop.apache.org/core/docs/current/api/
-      </a>
+      Hadoop <a href="ext:api">JavaDoc API</a>.
       </p>
       <p>
       HDFS source code: 

Modified: hadoop/core/branches/HADOOP-3628-2/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml Tue Jun  9 16:11:19 2009
@@ -610,8 +610,7 @@
       <li> <a href="http://wiki.apache.org/hadoop/FAQ">FAQ</a> from Hadoop Wiki.
       </li>
       <li>
-        Hadoop <a href="http://hadoop.apache.org/core/docs/current/api/">
-          JavaDoc API</a>.
+        Hadoop <a href="ext:api">JavaDoc API</a>.
       </li>
       <li>
         Hadoop User Mailing List : 

Modified: hadoop/core/branches/HADOOP-3628-2/src/examples/org/apache/hadoop/examples/MultiFileWordCount.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/examples/org/apache/hadoop/examples/MultiFileWordCount.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/examples/org/apache/hadoop/examples/MultiFileWordCount.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/examples/org/apache/hadoop/examples/MultiFileWordCount.java Tue Jun  9 16:11:19 2009
@@ -18,14 +18,11 @@
 
 package org.apache.hadoop.examples;
 
-import java.io.BufferedReader;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.util.StringTokenizer;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -33,19 +30,18 @@
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.MultiFileInputFormat;
-import org.apache.hadoop.mapred.MultiFileSplit;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.lib.LongSumReducer;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
+import org.apache.hadoop.util.LineReader;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -99,131 +95,129 @@
 
 
   /**
-   * To use {@link MultiFileInputFormat}, one should extend it, to return a 
-   * (custom) {@link RecordReader}. MultiFileInputFormat uses 
-   * {@link MultiFileSplit}s. 
+   * To use {@link CombineFileInputFormat}, one should extend it, to return a 
+   * (custom) {@link RecordReader}. CombineFileInputFormat uses 
+   * {@link CombineFileSplit}s. 
    */
   public static class MyInputFormat 
-    extends MultiFileInputFormat<WordOffset, Text>  {
+    extends CombineFileInputFormat<WordOffset, Text>  {
 
-    @Override
-    public RecordReader<WordOffset,Text> getRecordReader(InputSplit split
-        , JobConf job, Reporter reporter) throws IOException {
-      return new MultiFileLineRecordReader(job, (MultiFileSplit)split);
+    public RecordReader<WordOffset,Text> createRecordReader(InputSplit split,
+        TaskAttemptContext context) throws IOException {
+      return new CombineFileRecordReader<WordOffset, Text>(
+        (CombineFileSplit)split, context, CombineFileLineRecordReader.class);
     }
   }
 
   /**
-   * RecordReader is responsible from extracting records from the InputSplit. 
-   * This record reader accepts a {@link MultiFileSplit}, which encapsulates several 
-   * files, and no file is divided.
+   * RecordReader is responsible from extracting records from a chunk
+   * of the CombineFileSplit. 
    */
-  public static class MultiFileLineRecordReader 
-    implements RecordReader<WordOffset, Text> {
+  public static class CombineFileLineRecordReader 
+    extends RecordReader<WordOffset, Text> {
 
-    private MultiFileSplit split;
-    private long offset; //total offset read so far;
-    private long totLength;
+    private long startOffset; //offset of the chunk;
+    private long end; //end of the chunk;
+    private long pos; // current pos 
     private FileSystem fs;
-    private int count = 0;
-    private Path[] paths;
+    private Path path;
+    private WordOffset key;
+    private Text value;
     
-    private FSDataInputStream currentStream;
-    private BufferedReader currentReader;
+    private FSDataInputStream fileIn;
+    private LineReader reader;
     
-    public MultiFileLineRecordReader(Configuration conf, MultiFileSplit split)
-      throws IOException {
+    public CombineFileLineRecordReader(CombineFileSplit split,
+        TaskAttemptContext context, Integer index) throws IOException {
       
-      this.split = split;
-      fs = FileSystem.get(conf);
-      this.paths = split.getPaths();
-      this.totLength = split.getLength();
-      this.offset = 0;
+      fs = FileSystem.get(context.getConfiguration());
+      this.path = split.getPath(index);
+      this.startOffset = split.getOffset(index);
+      this.end = startOffset + split.getLength(index);
+      boolean skipFirstLine = false;
       
-      //open the first file
-      Path file = paths[count];
-      currentStream = fs.open(file);
-      currentReader = new BufferedReader(new InputStreamReader(currentStream));
+      //open the file
+      fileIn = fs.open(path);
+      if (startOffset != 0) {
+        skipFirstLine = true;
+        --startOffset;
+        fileIn.seek(startOffset);
+      }
+      reader = new LineReader(fileIn);
+      if (skipFirstLine) {  // skip first line and re-establish "startOffset".
+        startOffset += reader.readLine(new Text(), 0,
+                    (int)Math.min((long)Integer.MAX_VALUE, end - startOffset));
+      }
+      this.pos = startOffset;
     }
 
-    public void close() throws IOException { }
-
-    public long getPos() throws IOException {
-      long currentOffset = currentStream == null ? 0 : currentStream.getPos();
-      return offset + currentOffset;
+    public void initialize(InputSplit split, TaskAttemptContext context)
+        throws IOException, InterruptedException {
     }
 
+    public void close() throws IOException { }
+
     public float getProgress() throws IOException {
-      return ((float)getPos()) / totLength;
+      if (startOffset == end) {
+        return 0.0f;
+      } else {
+        return Math.min(1.0f, (pos - startOffset) / (float)(end - startOffset));
+      }
     }
 
-    public boolean next(WordOffset key, Text value) throws IOException {
-      if(count >= split.getNumPaths())
+    public boolean nextKeyValue() throws IOException {
+      if (key == null) {
+        key = new WordOffset();
+        key.fileName = path.getName();
+      }
+      key.offset = pos;
+      if (value == null) {
+        value = new Text();
+      }
+      int newSize = 0;
+      if (pos < end) {
+        newSize = reader.readLine(value);
+        pos += newSize;
+      }
+      if (newSize == 0) {
+        key = null;
+        value = null;
         return false;
-
-      /* Read from file, fill in key and value, if we reach the end of file,
-       * then open the next file and continue from there until all files are
-       * consumed.  
-       */
-      String line;
-      do {
-        line = currentReader.readLine();
-        if(line == null) {
-          //close the file
-          currentReader.close();
-          offset += split.getLength(count);
-          
-          if(++count >= split.getNumPaths()) //if we are done
-            return false;
-          
-          //open a new file
-          Path file = paths[count];
-          currentStream = fs.open(file);
-          currentReader=new BufferedReader(new InputStreamReader(currentStream));
-          key.fileName = file.getName();
-        }
-      } while(line == null);
-      //update the key and value
-      key.offset = currentStream.getPos();
-      value.set(line);
-      
-      return true;
+      } else {
+        return true;
+      }
     }
 
-    public WordOffset createKey() {
-      WordOffset wo = new WordOffset();
-      wo.fileName = paths[0].toString(); //set as the first file
-      return wo;
+    public WordOffset getCurrentKey() 
+        throws IOException, InterruptedException {
+      return key;
     }
 
-    public Text createValue() {
-      return new Text();
+    public Text getCurrentValue() throws IOException, InterruptedException {
+      return value;
     }
   }
 
   /**
    * This Mapper is similar to the one in {@link WordCount.MapClass}.
    */
-  public static class MapClass extends MapReduceBase
-    implements Mapper<WordOffset, Text, Text, IntWritable> {
-
+  public static class MapClass extends 
+      Mapper<WordOffset, Text, Text, IntWritable> {
     private final static IntWritable one = new IntWritable(1);
     private Text word = new Text();
     
-    public void map(WordOffset key, Text value,
-        OutputCollector<Text, IntWritable> output, Reporter reporter)
-        throws IOException {
+    public void map(WordOffset key, Text value, Context context)
+        throws IOException, InterruptedException {
       
       String line = value.toString();
       StringTokenizer itr = new StringTokenizer(line);
       while (itr.hasMoreTokens()) {
         word.set(itr.nextToken());
-        output.collect(word, one);
+        context.write(word, one);
       }
     }
   }
   
-  
   private void printUsage() {
     System.out.println("Usage : multifilewc <input_dir> <output>" );
   }
@@ -232,14 +226,15 @@
 
     if(args.length < 2) {
       printUsage();
-      return 1;
+      return 2;
     }
 
-    JobConf job = new JobConf(getConf(), MultiFileWordCount.class);
+    Job job = new Job(getConf());
     job.setJobName("MultiFileWordCount");
+    job.setJarByClass(MultiFileWordCount.class);
 
     //set the InputFormat of the job to our InputFormat
-    job.setInputFormat(MyInputFormat.class);
+    job.setInputFormatClass(MyInputFormat.class);
     
     // the keys are words (strings)
     job.setOutputKeyClass(Text.class);
@@ -249,15 +244,13 @@
     //use the defined mapper
     job.setMapperClass(MapClass.class);
     //use the WordCount Reducer
-    job.setCombinerClass(LongSumReducer.class);
-    job.setReducerClass(LongSumReducer.class);
+    job.setCombinerClass(IntSumReducer.class);
+    job.setReducerClass(IntSumReducer.class);
 
     FileInputFormat.addInputPaths(job, args[0]);
     FileOutputFormat.setOutputPath(job, new Path(args[1]));
 
-    JobClient.runJob(job);
-    
-    return 0;
+    return job.waitForCompletion(true) ? 0 : 1;
   }
 
   public static void main(String[] args) throws Exception {

Modified: hadoop/core/branches/HADOOP-3628-2/src/examples/org/apache/hadoop/examples/Sort.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/examples/org/apache/hadoop/examples/Sort.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/examples/org/apache/hadoop/examples/Sort.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/examples/org/apache/hadoop/examples/Sort.java Tue Jun  9 16:11:19 2009
@@ -29,11 +29,15 @@
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.mapred.lib.IdentityMapper;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
-import org.apache.hadoop.mapred.lib.InputSampler;
-import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
+import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -42,7 +46,7 @@
  * other than use the framework to fragment and sort the input values.
  *
  * To run: bin/hadoop jar build/hadoop-examples.jar sort
- *            [-m <i>maps</i>] [-r <i>reduces</i>]
+ *            [-r <i>reduces</i>]
  *            [-inFormat <i>input format class</i>] 
  *            [-outFormat <i>output format class</i>] 
  *            [-outKey <i>output key class</i>] 
@@ -51,10 +55,10 @@
  *            <i>in-dir</i> <i>out-dir</i> 
  */
 public class Sort<K,V> extends Configured implements Tool {
-  private RunningJob jobResult = null;
+  private Job job = null;
 
   static int printUsage() {
-    System.out.println("sort [-m <maps>] [-r <reduces>] " +
+    System.out.println("sort [-r <reduces>] " +
                        "[-inFormat <input format class>] " +
                        "[-outFormat <output format class>] " + 
                        "[-outKey <output key class>] " +
@@ -62,7 +66,7 @@
                        "[-totalOrder <pcnt> <num samples> <max splits>] " +
                        "<input> <output>");
     ToolRunner.printGenericCommandUsage(System.out);
-    return -1;
+    return 2;
   }
 
   /**
@@ -73,16 +77,11 @@
    */
   public int run(String[] args) throws Exception {
 
-    JobConf jobConf = new JobConf(getConf(), Sort.class);
-    jobConf.setJobName("sorter");
-
-    jobConf.setMapperClass(IdentityMapper.class);        
-    jobConf.setReducerClass(IdentityReducer.class);
-
-    JobClient client = new JobClient(jobConf);
+    Configuration conf = getConf();
+    JobClient client = new JobClient(conf);
     ClusterStatus cluster = client.getClusterStatus();
     int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9);
-    String sort_reduces = jobConf.get("test.sort.reduces_per_host");
+    String sort_reduces = conf.get("test.sort.reduces_per_host");
     if (sort_reduces != null) {
        num_reduces = cluster.getTaskTrackers() * 
                        Integer.parseInt(sort_reduces);
@@ -97,9 +96,7 @@
     InputSampler.Sampler<K,V> sampler = null;
     for(int i=0; i < args.length; ++i) {
       try {
-        if ("-m".equals(args[i])) {
-          jobConf.setNumMapTasks(Integer.parseInt(args[++i]));
-        } else if ("-r".equals(args[i])) {
+        if ("-r".equals(args[i])) {
           num_reduces = Integer.parseInt(args[++i]);
         } else if ("-inFormat".equals(args[i])) {
           inputFormatClass = 
@@ -132,15 +129,21 @@
         return printUsage(); // exits
       }
     }
-
     // Set user-supplied (possibly default) job configs
-    jobConf.setNumReduceTasks(num_reduces);
+    job = new Job(conf);
+    job.setJobName("sorter");
+    job.setJarByClass(Sort.class);
 
-    jobConf.setInputFormat(inputFormatClass);
-    jobConf.setOutputFormat(outputFormatClass);
+    job.setMapperClass(Mapper.class);        
+    job.setReducerClass(Reducer.class);
 
-    jobConf.setOutputKeyClass(outputKeyClass);
-    jobConf.setOutputValueClass(outputValueClass);
+    job.setNumReduceTasks(num_reduces);
+
+    job.setInputFormatClass(inputFormatClass);
+    job.setOutputFormatClass(outputFormatClass);
+
+    job.setOutputKeyClass(outputKeyClass);
+    job.setOutputValueClass(outputValueClass);
 
     // Make sure there are exactly 2 parameters left.
     if (otherArgs.size() != 2) {
@@ -148,37 +151,37 @@
           otherArgs.size() + " instead of 2.");
       return printUsage();
     }
-    FileInputFormat.setInputPaths(jobConf, otherArgs.get(0));
-    FileOutputFormat.setOutputPath(jobConf, new Path(otherArgs.get(1)));
-
+    FileInputFormat.setInputPaths(job, otherArgs.get(0));
+    FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));
+    
     if (sampler != null) {
       System.out.println("Sampling input to effect total-order sort...");
-      jobConf.setPartitionerClass(TotalOrderPartitioner.class);
-      Path inputDir = FileInputFormat.getInputPaths(jobConf)[0];
-      inputDir = inputDir.makeQualified(inputDir.getFileSystem(jobConf));
+      job.setPartitionerClass(TotalOrderPartitioner.class);
+      Path inputDir = FileInputFormat.getInputPaths(job)[0];
+      inputDir = inputDir.makeQualified(inputDir.getFileSystem(conf));
       Path partitionFile = new Path(inputDir, "_sortPartitioning");
-      TotalOrderPartitioner.setPartitionFile(jobConf, partitionFile);
-      InputSampler.<K,V>writePartitionFile(jobConf, sampler);
+      TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
+      InputSampler.<K,V>writePartitionFile(job, sampler);
       URI partitionUri = new URI(partitionFile.toString() +
                                  "#" + "_sortPartitioning");
-      DistributedCache.addCacheFile(partitionUri, jobConf);
-      DistributedCache.createSymlink(jobConf);
+      DistributedCache.addCacheFile(partitionUri, conf);
+      DistributedCache.createSymlink(conf);
     }
 
     System.out.println("Running on " +
         cluster.getTaskTrackers() +
         " nodes to sort from " + 
-        FileInputFormat.getInputPaths(jobConf)[0] + " into " +
-        FileOutputFormat.getOutputPath(jobConf) +
+        FileInputFormat.getInputPaths(job)[0] + " into " +
+        FileOutputFormat.getOutputPath(job) +
         " with " + num_reduces + " reduces.");
     Date startTime = new Date();
     System.out.println("Job started: " + startTime);
-    jobResult = JobClient.runJob(jobConf);
+    int ret = job.waitForCompletion(true) ? 0 : 1;
     Date end_time = new Date();
     System.out.println("Job ended: " + end_time);
     System.out.println("The job took " + 
         (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
-    return 0;
+    return ret;
   }
 
 
@@ -192,7 +195,7 @@
    * Get the last job that was run using this instance.
    * @return the results of the last job that was run
    */
-  public RunningJob getResult() {
-    return jobResult;
+  public Job getResult() {
+    return job;
   }
 }

Modified: hadoop/core/branches/HADOOP-3628-2/src/examples/org/apache/hadoop/examples/dancing/DistributedPentomino.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/examples/org/apache/hadoop/examples/dancing/DistributedPentomino.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/examples/org/apache/hadoop/examples/dancing/DistributedPentomino.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/examples/org/apache/hadoop/examples/dancing/DistributedPentomino.java Tue Jun  9 16:11:19 2009
@@ -28,8 +28,9 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.*;
 
 /**
@@ -43,21 +44,25 @@
  */
 public class DistributedPentomino extends Configured implements Tool {
 
+  private static final int PENT_DEPTH = 5;
+  private static final int PENT_WIDTH = 9;
+  private static final int PENT_HEIGHT = 10;
+  private static final int DEFAULT_MAPS = 2000;
+  
   /**
    * Each map takes a line, which represents a prefix move and finds all of 
    * the solutions that start with that prefix. The output is the prefix as
    * the key and the solution as the value.
    */
-  public static class PentMap extends MapReduceBase
-    implements Mapper<WritableComparable, Text, Text, Text> {
+  public static class PentMap extends 
+      Mapper<WritableComparable<?>, Text, Text, Text> {
     
     private int width;
     private int height;
     private int depth;
     private Pentomino pent;
     private Text prefixString;
-    private OutputCollector<Text, Text> output;
-    private Reporter reporter;
+    private Context context;
     
     /**
      * For each solution, generate the prefix and a string representation
@@ -72,10 +77,12 @@
       public void solution(List<List<Pentomino.ColumnName>> answer) {
         String board = Pentomino.stringifySolution(width, height, answer);
         try {
-          output.collect(prefixString, new Text("\n" + board));
-          reporter.incrCounter(pent.getCategory(answer), 1);
+          context.write(prefixString, new Text("\n" + board));
+          context.getCounter(pent.getCategory(answer)).increment(1);
         } catch (IOException e) {
           System.err.println(StringUtils.stringifyException(e));
+        } catch (InterruptedException ie) {
+          System.err.println(StringUtils.stringifyException(ie));
         }
       }
     }
@@ -85,11 +92,8 @@
      * will be selected for each column in order). Find all solutions with
      * that prefix.
      */
-    public void map(WritableComparable key, Text value,
-                    OutputCollector<Text, Text> output, Reporter reporter
-                    ) throws IOException {
-      this.output = output;
-      this.reporter = reporter;
+    public void map(WritableComparable<?> key, Text value,Context context) 
+        throws IOException {
       prefixString = value;
       StringTokenizer itr = new StringTokenizer(prefixString.toString(), ",");
       int[] prefix = new int[depth];
@@ -102,10 +106,12 @@
     }
     
     @Override
-    public void configure(JobConf conf) {
-      depth = conf.getInt("pent.depth", -1);
-      width = conf.getInt("pent.width", -1);
-      height = conf.getInt("pent.height", -1);
+    public void setup(Context context) {
+      this.context = context;
+      Configuration conf = context.getConfiguration();
+      depth = conf.getInt("pent.depth", PENT_DEPTH);
+      width = conf.getInt("pent.width", PENT_WIDTH);
+      height = conf.getInt("pent.height", PENT_HEIGHT);
       pent = (Pentomino) 
         ReflectionUtils.newInstance(conf.getClass("pent.class", 
                                                   OneSidedPentomino.class), 
@@ -123,16 +129,17 @@
    * @param pent the puzzle 
    * @param depth the depth to explore when generating prefixes
    */
-  private static void createInputDirectory(FileSystem fs, 
+  private static long createInputDirectory(FileSystem fs, 
                                            Path dir,
                                            Pentomino pent,
                                            int depth
                                            ) throws IOException {
     fs.mkdirs(dir);
     List<int[]> splits = pent.getSplits(depth);
+    Path input = new Path(dir, "part1");
     PrintStream file = 
       new PrintStream(new BufferedOutputStream
-                      (fs.create(new Path(dir, "part1")), 64*1024));
+                      (fs.create(input), 64*1024));
     for(int[] prefix: splits) {
       for(int i=0; i < prefix.length; ++i) {
         if (i != 0) {
@@ -143,6 +150,7 @@
       file.print('\n');
     }
     file.close();
+    return fs.getFileStatus(input).getLen();
   }
   
   /**
@@ -151,57 +159,54 @@
    * Splits the job into 2000 maps and 1 reduce.
    */
   public static void main(String[] args) throws Exception {
-    int res = ToolRunner.run(new Configuration(), new DistributedPentomino(), args);
+    int res = ToolRunner.run(new Configuration(), 
+                new DistributedPentomino(), args);
     System.exit(res);
   }
 
   public int run(String[] args) throws Exception {
-    JobConf conf;
-    int depth = 5;
-    int width = 9;
-    int height = 10;
-    Class<? extends Pentomino> pentClass;
     if (args.length == 0) {
       System.out.println("pentomino <output>");
       ToolRunner.printGenericCommandUsage(System.out);
-      return -1;
+      return 2;
     }
-    
-    conf = new JobConf(getConf());
-    width = conf.getInt("pent.width", width);
-    height = conf.getInt("pent.height", height);
-    depth = conf.getInt("pent.depth", depth);
-    pentClass = conf.getClass("pent.class", OneSidedPentomino.class, Pentomino.class);
-    
+
+    Configuration conf = getConf();
+    int width = conf.getInt("pent.width", PENT_WIDTH);
+    int height = conf.getInt("pent.height", PENT_HEIGHT);
+    int depth = conf.getInt("pent.depth", PENT_DEPTH);
+    Class<? extends Pentomino> pentClass = conf.getClass("pent.class", 
+      OneSidedPentomino.class, Pentomino.class);
+    int numMaps = conf.getInt("mapred.map.tasks", DEFAULT_MAPS);
     Path output = new Path(args[0]);
     Path input = new Path(output + "_input");
     FileSystem fileSys = FileSystem.get(conf);
     try {
-      FileInputFormat.setInputPaths(conf, input);
-      FileOutputFormat.setOutputPath(conf, output);
-      conf.setJarByClass(PentMap.class);
+      Job job = new Job(conf);
+      FileInputFormat.setInputPaths(job, input);
+      FileOutputFormat.setOutputPath(job, output);
+      job.setJarByClass(PentMap.class);
       
-      conf.setJobName("dancingElephant");
+      job.setJobName("dancingElephant");
       Pentomino pent = ReflectionUtils.newInstance(pentClass, conf);
       pent.initialize(width, height);
-      createInputDirectory(fileSys, input, pent, depth);
+      long inputSize = createInputDirectory(fileSys, input, pent, depth);
+      // for forcing the number of maps
+      FileInputFormat.setMaxInputSplitSize(job, (inputSize/numMaps));
    
       // the keys are the prefix strings
-      conf.setOutputKeyClass(Text.class);
+      job.setOutputKeyClass(Text.class);
       // the values are puzzle solutions
-      conf.setOutputValueClass(Text.class);
+      job.setOutputValueClass(Text.class);
       
-      conf.setMapperClass(PentMap.class);        
-      conf.setReducerClass(IdentityReducer.class);
+      job.setMapperClass(PentMap.class);        
+      job.setReducerClass(Reducer.class);
       
-      conf.setNumMapTasks(2000);
-      conf.setNumReduceTasks(1);
+      job.setNumReduceTasks(1);
       
-      JobClient.runJob(conf);
+      return (job.waitForCompletion(true) ? 0 : 1);
       } finally {
       fileSys.delete(input, true);
     }
-    return 0;
   }
-
 }



Mime
View raw message