hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r885145 [26/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/ src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/ src/benchmarks/gridmix2/ src/benchmarks/gridmix2/sr...
Date Sat, 28 Nov 2009 20:26:22 GMT
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Sat Nov 28 20:26:01 2009
@@ -20,20 +20,24 @@
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Random;
+import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.RunJar;
 
 /**
@@ -59,9 +63,17 @@
     LogFactory.getLog(TrackerDistributedCacheManager.class);
 
   private final LocalFileSystem localFs;
+  
+  private LocalDirAllocator lDirAllocator;
+  
+  private Configuration trackerConf;
+  
+  private Random random = new Random();
 
   public TrackerDistributedCacheManager(Configuration conf) throws IOException {
     this.localFs = FileSystem.getLocal(conf);
+    this.trackerConf = conf;
+    this.lDirAllocator = new LocalDirAllocator(TTConfig.LOCAL_DIR);
   }
 
   /**
@@ -71,7 +83,7 @@
    * @param cache the cache to be localized, this should be specified as
    * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
    * @param conf The Configuration file which contains the filesystem
-   * @param baseDir The base cache Dir where you wnat to localize the 
+   * @param subDir The base cache subDir where you want to localize the 
    *  files/archives
    * @param fileStatus The file status on the dfs.
    * @param isArchive if the cache is an archive or a file. In case it is an
@@ -94,39 +106,59 @@
    * @throws IOException
    */
   Path getLocalCache(URI cache, Configuration conf,
-      Path baseDir, FileStatus fileStatus,
+      String subDir, FileStatus fileStatus,
       boolean isArchive, long confFileStamp,
       Path currentWorkDir, boolean honorSymLinkConf)
       throws IOException {
-    String cacheId = makeRelative(cache, conf);
+    String key = getKey(cache, conf, confFileStamp);
     CacheStatus lcacheStatus;
-    Path localizedPath;
+    Path localizedPath = null;
     synchronized (cachedArchives) {
-      lcacheStatus = cachedArchives.get(cacheId);
+      lcacheStatus = cachedArchives.get(key);
       if (lcacheStatus == null) {
         // was never localized
-        lcacheStatus = new CacheStatus(baseDir, 
-          new Path(baseDir, new Path(cacheId)));
-        cachedArchives.put(cacheId, lcacheStatus);
-      }
-
-      synchronized (lcacheStatus) {
+        String cachePath = new Path (subDir, 
+          new Path(String.valueOf(random.nextLong()),
+            makeRelative(cache, conf))).toString();
+        Path localPath = lDirAllocator.getLocalPathForWrite(cachePath,
+          fileStatus.getLen(), trackerConf);
+        lcacheStatus = new CacheStatus(
+          new Path(localPath.toString().replace(cachePath, "")), localPath); 
+        cachedArchives.put(key, lcacheStatus);
+      }
+
+      //mark the cache for use. 
+      lcacheStatus.refcount++;
+    }
+    
+    // do the localization, after releasing the global lock
+    synchronized (lcacheStatus) {
+      if (!lcacheStatus.isInited()) {
         localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus,
-            fileStatus, isArchive, currentWorkDir, honorSymLinkConf);
-        lcacheStatus.refcount++;
+            fileStatus, isArchive);
+        lcacheStatus.initComplete();
+      } else {
+        localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
+            lcacheStatus, fileStatus, isArchive);
       }
+      createSymlink(conf, cache, lcacheStatus, isArchive,
+          currentWorkDir, honorSymLinkConf);
     }
 
     // try deleting stuff if you can
     long size = 0;
-    synchronized (baseDirSize) {
-      Long get = baseDirSize.get(baseDir);
-      if ( get != null ) {
-      size = get.longValue();
+    synchronized (lcacheStatus) {
+      synchronized (baseDirSize) {
+        Long get = baseDirSize.get(lcacheStatus.getBaseDir());
+        if ( get != null ) {
+         size = get.longValue();
+        } else {
+          LOG.warn("Cannot find size of baseDir: " + lcacheStatus.getBaseDir());
+        }
       }
     }
     // setting the cache size to a default of 10GB
-    long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE);
+    long allowedSize = conf.getLong(TTConfig.TT_LOCAL_CACHE_SIZE, DEFAULT_CACHE_SIZE);
     if (allowedSize < size) {
       // try some cache deletions
       deleteCache(conf);
@@ -142,40 +174,58 @@
    * is contained in.
    * @throws IOException
    */
-  void releaseCache(URI cache, Configuration conf)
+  void releaseCache(URI cache, Configuration conf, long timeStamp)
     throws IOException {
-    String cacheId = makeRelative(cache, conf);
+    String key = getKey(cache, conf, timeStamp);
     synchronized (cachedArchives) {
-      CacheStatus lcacheStatus = cachedArchives.get(cacheId);
-      if (lcacheStatus == null)
+      CacheStatus lcacheStatus = cachedArchives.get(key);
+      if (lcacheStatus == null) {
+        LOG.warn("Cannot find localized cache: " + cache + 
+                 " (key: " + key + ") in releaseCache!");
         return;
-      synchronized (lcacheStatus) {
-        lcacheStatus.refcount--;
       }
+      
+      // decrement ref count 
+      lcacheStatus.refcount--;
     }
   }
 
   // To delete the caches which have a refcount of zero
 
   private void deleteCache(Configuration conf) throws IOException {
+    Set<CacheStatus> deleteSet = new HashSet<CacheStatus>();
     // try deleting cache Status with refcount of zero
     synchronized (cachedArchives) {
       for (Iterator<String> it = cachedArchives.keySet().iterator(); 
           it.hasNext();) {
         String cacheId = it.next();
         CacheStatus lcacheStatus = cachedArchives.get(cacheId);
-        synchronized (lcacheStatus) {
-          if (lcacheStatus.refcount == 0) {
-            // delete this cache entry
-            FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
-            synchronized (baseDirSize) {
-              Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
-              if ( dirSize != null ) {
-              dirSize -= lcacheStatus.size;
-              baseDirSize.put(lcacheStatus.baseDir, dirSize);
-              }
-            }
-            it.remove();
+        
+        // if reference count is zero 
+        // mark the cache for deletion
+        if (lcacheStatus.refcount == 0) {
+          // delete this cache entry from the global list 
+          // and mark the localized file for deletion
+          deleteSet.add(lcacheStatus);
+          it.remove();
+        }
+      }
+    }
+    
+    // do the deletion, after releasing the global lock
+    for (CacheStatus lcacheStatus : deleteSet) {
+      synchronized (lcacheStatus) {
+        FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
+        LOG.info("Deleted path " + lcacheStatus.localLoadPath);
+        // decrement the size of the cache from baseDirSize
+        synchronized (baseDirSize) {
+          Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
+          if ( dirSize != null ) {
+            dirSize -= lcacheStatus.size;
+            baseDirSize.put(lcacheStatus.baseDir, dirSize);
+          } else {
+            LOG.warn("Cannot find record of the baseDir: " + 
+                     lcacheStatus.baseDir + " during delete!");
           }
         }
       }
@@ -208,144 +258,136 @@
     return path;
   }
 
-  private Path cacheFilePath(Path p) {
-    return new Path(p, p.getName());
+  String getKey(URI cache, Configuration conf, long timeStamp) 
+      throws IOException {
+    return makeRelative(cache, conf) + String.valueOf(timeStamp);
   }
+  
+  /**
+   * Returns mtime of a given cache file on hdfs.
+   * 
+   * @param conf configuration
+   * @param cache cache file 
+   * @return mtime of a given cache file on hdfs
+   * @throws IOException
+   */
+  static long getTimestamp(Configuration conf, URI cache)
+    throws IOException {
+    FileSystem fileSystem = FileSystem.get(cache, conf);
+    Path filePath = new Path(cache.getPath());
 
-  // the method which actually copies the caches locally and unjars/unzips them
-  // and does chmod for the files
-  private Path localizeCache(Configuration conf,
-                                    URI cache, long confFileStamp,
-                                    CacheStatus cacheStatus,
-                                    FileStatus fileStatus,
-                                    boolean isArchive,
-                                    Path currentWorkDir, 
-                                    boolean honorSymLinkConf)
-  throws IOException {
+    return fileSystem.getFileStatus(filePath).getModificationTime();
+  }
+
+  private Path checkCacheStatusValidity(Configuration conf,
+      URI cache, long confFileStamp,
+      CacheStatus cacheStatus,
+      FileStatus fileStatus,
+      boolean isArchive
+      ) throws IOException {
+    FileSystem fs = FileSystem.get(cache, conf);
+    // Has to be 
+    if (!ifExistsAndFresh(conf, fs, cache, confFileStamp,
+                          cacheStatus, fileStatus)) {
+      throw new IOException("Stale cache file: " + cacheStatus.localLoadPath + 
+                            " for cache-file: " + cache);
+    }
+
+    LOG.info(String.format("Using existing cache of %s->%s",
+        cache.toString(), cacheStatus.localLoadPath));
+    return cacheStatus.localLoadPath;
+  }
+  
+  private void createSymlink(Configuration conf, URI cache,
+      CacheStatus cacheStatus, boolean isArchive,
+      Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
     boolean doSymlink = honorSymLinkConf && DistributedCache.getSymlink(conf);
     if(cache.getFragment() == null) {
       doSymlink = false;
     }
-    FileSystem fs = FileSystem.get(cache, conf);
     String link = 
       currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
     File flink = new File(link);
-    if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
-                           cacheStatus, fileStatus)) {
-      LOG.info(String.format("Using existing cache of %s->%s",
-          cache.toString(), cacheStatus.localLoadPath));
-      if (isArchive) {
-        if (doSymlink){
-          if (!flink.exists())
-            FileUtil.symLink(cacheStatus.localLoadPath.toString(),
-                             link);
-        }
-
-        return cacheStatus.localLoadPath;
-      }
-      else {
-        if (doSymlink){
-          if (!flink.exists())
-            FileUtil.symLink(
-              cacheFilePath(cacheStatus.localLoadPath).toString(), link);
-        }
-        return cacheFilePath(cacheStatus.localLoadPath);
+    if (doSymlink){
+      if (!flink.exists()) {
+        FileUtil.symLink(cacheStatus.localLoadPath.toString(), link);
       }
+    }
+  }
+  
+  // the method which actually copies the caches locally and unjars/unzips them
+  // and does chmod for the files
+  private Path localizeCache(Configuration conf,
+                                    URI cache, long confFileStamp,
+                                    CacheStatus cacheStatus,
+                                    FileStatus fileStatus,
+                                    boolean isArchive)
+  throws IOException {
+    FileSystem fs = FileSystem.get(cache, conf);
+    FileSystem localFs = FileSystem.getLocal(conf);
+    Path parchive = null;
+    if (isArchive) {
+      parchive = new Path(cacheStatus.localLoadPath,
+        new Path(cacheStatus.localLoadPath.getName()));
     } else {
+      parchive = cacheStatus.localLoadPath;
+    }
 
-      // remove the old archive
-      // if the old archive cannot be removed since it is being used by another
-      // job
-      // return null
-      if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))
-        throw new IOException("Cache " + cacheStatus.localLoadPath.toString()
-                              + " is in use and cannot be refreshed");
-
-      FileSystem localFs = FileSystem.getLocal(conf);
-      localFs.delete(cacheStatus.localLoadPath, true);
-      synchronized (baseDirSize) {
-      Long dirSize = baseDirSize.get(cacheStatus.baseDir);
-      if ( dirSize != null ) {
-        dirSize -= cacheStatus.size;
-        baseDirSize.put(cacheStatus.baseDir, dirSize);
-      }
-      }
-      Path parchive = new Path(cacheStatus.localLoadPath,
-                               new Path(cacheStatus.localLoadPath.getName()));
-
-      if (!localFs.mkdirs(cacheStatus.localLoadPath)) {
-        throw new IOException("Mkdirs failed to create directory " +
-                              cacheStatus.localLoadPath.toString());
-      }
-
-      String cacheId = cache.getPath();
-      fs.copyToLocalFile(new Path(cacheId), parchive);
-      if (isArchive) {
-        String tmpArchive = parchive.toString().toLowerCase();
-        File srcFile = new File(parchive.toString());
-        File destDir = new File(parchive.getParent().toString());
-        LOG.info(String.format("Extracting %s to %s",
-            srcFile.toString(), destDir.toString()));
-        if (tmpArchive.endsWith(".jar")) {
-          RunJar.unJar(srcFile, destDir);
-        } else if (tmpArchive.endsWith(".zip")) {
-          FileUtil.unZip(srcFile, destDir);
-        } else if (isTarFile(tmpArchive)) {
-          FileUtil.unTar(srcFile, destDir);
-        } else {
-          LOG.warn(String.format(
+    if (!localFs.mkdirs(parchive.getParent())) {
+      throw new IOException("Mkdirs failed to create directory " +
+          cacheStatus.localLoadPath.toString());
+    }
+
+    String cacheId = cache.getPath();
+    fs.copyToLocalFile(new Path(cacheId), parchive);
+    if (isArchive) {
+      String tmpArchive = parchive.toString().toLowerCase();
+      File srcFile = new File(parchive.toString());
+      File destDir = new File(parchive.getParent().toString());
+      LOG.info(String.format("Extracting %s to %s",
+          srcFile.toString(), destDir.toString()));
+      if (tmpArchive.endsWith(".jar")) {
+        RunJar.unJar(srcFile, destDir);
+      } else if (tmpArchive.endsWith(".zip")) {
+        FileUtil.unZip(srcFile, destDir);
+      } else if (isTarFile(tmpArchive)) {
+        FileUtil.unTar(srcFile, destDir);
+      } else {
+        LOG.warn(String.format(
             "Cache file %s specified as archive, but not valid extension.", 
             srcFile.toString()));
-          // else will not do anyhting
-          // and copy the file into the dir as it is
-        }
+        // else will not do anyhting
+        // and copy the file into the dir as it is
       }
+    }
 
-      long cacheSize = 
-        FileUtil.getDU(new File(parchive.getParent().toString()));
-      cacheStatus.size = cacheSize;
-      synchronized (baseDirSize) {
-        Long dirSize = baseDirSize.get(cacheStatus.baseDir);
-        if( dirSize == null ) {
-          dirSize = Long.valueOf(cacheSize);
-        } else {
-          dirSize += cacheSize;
-        }
-        baseDirSize.put(cacheStatus.baseDir, dirSize);
+    long cacheSize = 
+      FileUtil.getDU(new File(parchive.getParent().toString()));
+    cacheStatus.size = cacheSize;
+    synchronized (baseDirSize) {
+      Long dirSize = baseDirSize.get(cacheStatus.baseDir);
+      if( dirSize == null ) {
+        dirSize = Long.valueOf(cacheSize);
+      } else {
+        dirSize += cacheSize;
       }
+      baseDirSize.put(cacheStatus.baseDir, dirSize);
+    }
 
-      // do chmod here
-      try {
-        //Setting recursive permission to grant everyone read and execute
-        FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
-      } catch(InterruptedException e) {
+    // do chmod here
+    try {
+      //Setting recursive permission to grant everyone read and execute
+      FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
+    } catch(InterruptedException e) {
       LOG.warn("Exception in chmod" + e.toString());
-      }
-
-      // update cacheStatus to reflect the newly cached file
-      cacheStatus.currentStatus = true;
-      cacheStatus.mtime = DistributedCache.getTimestamp(conf, cache);
-
-      LOG.info(String.format("Cached %s as %s",
-          cache.toString(), cacheStatus.localLoadPath));
     }
 
-    if (isArchive){
-      if (doSymlink){
-        if (!flink.exists())
-          FileUtil.symLink(cacheStatus.localLoadPath.toString(),
-                           link);
-      }
-      return cacheStatus.localLoadPath;
-    }
-    else {
-      if (doSymlink){
-        if (!flink.exists())
-          FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(),
-                           link);
-      }
-      return cacheFilePath(cacheStatus.localLoadPath);
-    }
+    // update cacheStatus to reflect the newly cached file
+    cacheStatus.mtime = getTimestamp(conf, cache);
+
+    LOG.info(String.format("Cached %s as %s",
+             cache.toString(), cacheStatus.localLoadPath));
+    return cacheStatus.localLoadPath;
   }
 
   private static boolean isTarFile(String filename) {
@@ -359,28 +401,22 @@
                                           CacheStatus lcacheStatus,
                                           FileStatus fileStatus)
   throws IOException {
-    // check for existence of the cache
-    if (lcacheStatus.currentStatus == false) {
-      return false;
+    long dfsFileStamp;
+    if (fileStatus != null) {
+      dfsFileStamp = fileStatus.getModificationTime();
     } else {
-      long dfsFileStamp;
-      if (fileStatus != null) {
-        dfsFileStamp = fileStatus.getModificationTime();
-      } else {
-        dfsFileStamp = DistributedCache.getTimestamp(conf, cache);
-      }
+      dfsFileStamp = getTimestamp(conf, cache);
+    }
 
-      // ensure that the file on hdfs hasn't been modified since the job started
-      if (dfsFileStamp != confFileStamp) {
-        LOG.fatal("File: " + cache + " has changed on HDFS since job started");
-        throw new IOException("File: " + cache +
-                              " has changed on HDFS since job started");
-      }
+    // ensure that the file on hdfs hasn't been modified since the job started
+    if (dfsFileStamp != confFileStamp) {
+      LOG.fatal("File: " + cache + " has changed on HDFS since job started");
+      throw new IOException("File: " + cache +
+                            " has changed on HDFS since job started");
+    }
 
-      if (dfsFileStamp != lcacheStatus.mtime) {
-        // needs refreshing
-        return false;
-      }
+    if (dfsFileStamp != lcacheStatus.mtime) {
+      return false;
     }
 
     return true;
@@ -421,9 +457,6 @@
   }
 
   private static class CacheStatus {
-    // false, not loaded yet, true is loaded
-    boolean currentStatus;
-
     // the local load path of this cache
     Path localLoadPath;
 
@@ -439,15 +472,31 @@
     // the cache-file modification time
     long mtime;
 
+    // is it initialized ?
+    boolean inited = false;
+    
     public CacheStatus(Path baseDir, Path localLoadPath) {
       super();
-      this.currentStatus = false;
       this.localLoadPath = localLoadPath;
       this.refcount = 0;
       this.mtime = -1;
       this.baseDir = baseDir;
       this.size = 0;
     }
+    
+    Path getBaseDir(){
+      return this.baseDir;
+    }
+    
+    // mark it as initialized
+    void initComplete() {
+      inited = true;
+    }
+
+    // is it initialized?
+    boolean isInited() {
+      return inited;
+    }
   }
 
   /**
@@ -488,25 +537,113 @@
     if (tarchives != null) {
       StringBuffer archiveTimestamps = 
         new StringBuffer(String.valueOf(
-            DistributedCache.getTimestamp(job, tarchives[0])));
+            getTimestamp(job, tarchives[0])));
       for (int i = 1; i < tarchives.length; i++) {
         archiveTimestamps.append(",");
         archiveTimestamps.append(String.valueOf(
-            DistributedCache.getTimestamp(job, tarchives[i])));
+            getTimestamp(job, tarchives[i])));
       }
-      DistributedCache.setArchiveTimestamps(job, archiveTimestamps.toString());
+      setArchiveTimestamps(job, archiveTimestamps.toString());
     }
   
     URI[] tfiles = DistributedCache.getCacheFiles(job);
     if (tfiles != null) {
       StringBuffer fileTimestamps = new StringBuffer(String.valueOf(
-          DistributedCache.getTimestamp(job, tfiles[0])));
+          getTimestamp(job, tfiles[0])));
       for (int i = 1; i < tfiles.length; i++) {
         fileTimestamps.append(",");
         fileTimestamps.append(String.valueOf(
-            DistributedCache.getTimestamp(job, tfiles[i])));
+            getTimestamp(job, tfiles[i])));
       }
-      DistributedCache.setFileTimestamps(job, fileTimestamps.toString());
+      setFileTimestamps(job, fileTimestamps.toString());
     }
   }
+  
+  /**
+   * This method checks if there is a conflict in the fragment names 
+   * of the uris. Also makes sure that each uri has a fragment. It 
+   * is only to be called if you want to create symlinks for 
+   * the various archives and files.  May be used by user code.
+   * @param uriFiles The uri array of urifiles
+   * @param uriArchives the uri array of uri archives
+   */
+  public static boolean checkURIs(URI[]  uriFiles, URI[] uriArchives){
+    if ((uriFiles == null) && (uriArchives == null)){
+      return true;
+    }
+    if (uriFiles != null){
+      for (int i = 0; i < uriFiles.length; i++){
+        String frag1 = uriFiles[i].getFragment();
+        if (frag1 == null)
+          return false;
+        for (int j=i+1; j < uriFiles.length; j++){
+          String frag2 = uriFiles[j].getFragment();
+          if (frag2 == null)
+            return false;
+          if (frag1.equalsIgnoreCase(frag2))
+            return false;
+        }
+        if (uriArchives != null){
+          for (int j = 0; j < uriArchives.length; j++){
+            String frag2 = uriArchives[j].getFragment();
+            if (frag2 == null){
+              return false;
+            }
+            if (frag1.equalsIgnoreCase(frag2))
+              return false;
+            for (int k=j+1; k < uriArchives.length; k++){
+              String frag3 = uriArchives[k].getFragment();
+              if (frag3 == null)
+                return false;
+              if (frag2.equalsIgnoreCase(frag3))
+                return false;
+            }
+          }
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
+   * This is to check the timestamp of the archives to be localized.
+   * 
+   * @param conf Configuration which stores the timestamp's
+   * @param timestamps comma separated list of timestamps of archives.
+   * The order should be the same as the order in which the archives are added.
+   */
+  static void setArchiveTimestamps(Configuration conf, String timestamps) {
+    conf.set(JobContext.CACHE_ARCHIVES_TIMESTAMPS, timestamps);
+  }
+
+  /**
+   * This is to check the timestamp of the files to be localized.
+   * 
+   * @param conf Configuration which stores the timestamp's
+   * @param timestamps comma separated list of timestamps of files.
+   * The order should be the same as the order in which the files are added.
+   */
+  static void setFileTimestamps(Configuration conf, String timestamps) {
+    conf.set(JobContext.CACHE_FILE_TIMESTAMPS, timestamps);
+  }
+  
+  /**
+   * Set the conf to contain the location for localized archives.
+   * 
+   * @param conf The conf to modify to contain the localized caches
+   * @param str a comma separated list of local archives
+   */
+  static void setLocalArchives(Configuration conf, String str) {
+    conf.set(JobContext.CACHE_LOCALARCHIVES, str);
+  }
+
+  /**
+   * Set the conf to contain the location for localized files.
+   * 
+   * @param conf The conf to modify to contain the localized caches
+   * @param str a comma separated list of local files
+   */
+  static void setLocalFiles(Configuration conf, String str) {
+    conf.set(JobContext.CACHE_LOCALFILES, str);
+  }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/aggregate/UniqValueCount.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/aggregate/UniqValueCount.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/aggregate/UniqValueCount.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/aggregate/UniqValueCount.java Sat Nov 28 20:26:01 2009
@@ -28,6 +28,8 @@
  * 
  */
 public class UniqValueCount implements ValueAggregator<Object> {
+  public static final String MAX_NUM_UNIQUE_VALUES = 
+    "mapreduce.aggregate.max.num.unique.values";
 
   private TreeMap<Object, Object> uniqItems = null;
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorBaseDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorBaseDescriptor.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorBaseDescriptor.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorBaseDescriptor.java Sat Nov 28 20:26:01 2009
@@ -23,6 +23,7 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobContext;
 
 /** 
  * This class implements the common functionalities of 
@@ -156,6 +157,6 @@
    * @param conf a configuration object
    */
   public void configure(Configuration conf) {
-    this.inputFile = conf.get("map.input.file");
+    this.inputFile = conf.get(JobContext.MAP_INPUT_FILE);
   }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorCombiner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorCombiner.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorCombiner.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorCombiner.java Sat Nov 28 20:26:01 2009
@@ -45,7 +45,7 @@
     int pos = keyStr.indexOf(ValueAggregatorDescriptor.TYPE_SEPARATOR);
     String type = keyStr.substring(0, pos);
     long uniqCount = context.getConfiguration().
-      getLong("aggregate.max.num.unique.values", Long.MAX_VALUE);
+      getLong(UniqValueCount.MAX_NUM_UNIQUE_VALUES, Long.MAX_VALUE);
     ValueAggregator aggregator = ValueAggregatorBaseDescriptor
       .generateValueAggregator(type, uniqCount);
     for (Text val : values) {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorJob.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorJob.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorJob.java Sat Nov 28 20:26:01 2009
@@ -26,6 +26,7 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
@@ -154,9 +155,9 @@
     if (specFile != null) {
       conf.addResource(specFile);
     }
-    String userJarFile = conf.get("user.jar.file");
+    String userJarFile = conf.get(ValueAggregatorJobBase.USER_JAR);
     if (userJarFile != null) {
-      conf.set("mapred.jar", userJarFile);
+      conf.set(JobContext.JAR, userJarFile);
     }
 
     Job theJob = new Job(conf);
@@ -192,10 +193,10 @@
   public static Configuration setAggregatorDescriptors(
       Class<? extends ValueAggregatorDescriptor>[] descriptors) {
     Configuration conf = new Configuration();
-    conf.setInt("aggregator.descriptor.num", descriptors.length);
+    conf.setInt(ValueAggregatorJobBase.DESCRIPTOR_NUM, descriptors.length);
     //specify the aggregator descriptors
     for(int i=0; i< descriptors.length; i++) {
-      conf.set("aggregator.descriptor." + i, 
+      conf.set(ValueAggregatorJobBase.DESCRIPTOR + i, 
                "UserDefined," + descriptors[i].getName());
     }
     return conf;

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorJobBase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorJobBase.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorJobBase.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorJobBase.java Sat Nov 28 20:26:01 2009
@@ -31,7 +31,11 @@
 public class ValueAggregatorJobBase<K1 extends WritableComparable<?>,
                                              V1 extends Writable>
 {
-
+  public static final String DESCRIPTOR = "mapreduce.aggregate.descriptor";
+  public static final String DESCRIPTOR_NUM = 
+    "mapreduce.aggregate.descriptor.num";
+  public static final String USER_JAR = "mapreduce.aggregate.user.jar.file";
+  
   protected static ArrayList<ValueAggregatorDescriptor> aggregatorDescriptorList = null;
 
   public static void setup(Configuration job) {
@@ -54,12 +58,11 @@
 
   protected static ArrayList<ValueAggregatorDescriptor> getAggregatorDescriptors(
       Configuration conf) {
-    String advn = "aggregator.descriptor";
-    int num = conf.getInt(advn + ".num", 0);
+    int num = conf.getInt(DESCRIPTOR_NUM, 0);
     ArrayList<ValueAggregatorDescriptor> retv = 
       new ArrayList<ValueAggregatorDescriptor>(num);
     for (int i = 0; i < num; i++) {
-      String spec = conf.get(advn + "." + i);
+      String spec = conf.get(DESCRIPTOR + "." + i);
       ValueAggregatorDescriptor ad = getValueAggregatorDescriptor(spec, conf);
       if (ad != null) {
         retv.add(ad);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorReducer.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorReducer.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorReducer.java Sat Nov 28 20:26:01 2009
@@ -55,7 +55,7 @@
     keyStr = keyStr.substring(pos + 
                ValueAggregatorDescriptor.TYPE_SEPARATOR.length());
     long uniqCount = context.getConfiguration().
-      getLong("aggregate.max.num.unique.values", Long.MAX_VALUE);
+      getLong(UniqValueCount.MAX_NUM_UNIQUE_VALUES, Long.MAX_VALUE);
     ValueAggregator aggregator = ValueAggregatorBaseDescriptor
       .generateValueAggregator(type, uniqCount);
     for (Text value : values) {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/db/DBConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/db/DBConfiguration.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/db/DBConfiguration.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/db/DBConfiguration.java Sat Nov 28 20:26:01 2009
@@ -43,54 +43,59 @@
 
   /** The JDBC Driver class name */
   public static final String DRIVER_CLASS_PROPERTY = 
-      "mapred.jdbc.driver.class";
+    "mapreduce.jdbc.driver.class";
   
   /** JDBC Database access URL */
-  public static final String URL_PROPERTY = "mapred.jdbc.url";
+  public static final String URL_PROPERTY = "mapreduce.jdbc.url";
 
   /** User name to access the database */
-  public static final String USERNAME_PROPERTY = "mapred.jdbc.username";
+  public static final String USERNAME_PROPERTY = "mapreduce.jdbc.username";
   
   /** Password to access the database */
-  public static final String PASSWORD_PROPERTY = "mapred.jdbc.password";
+  public static final String PASSWORD_PROPERTY = "mapreduce.jdbc.password";
 
   /** Input table name */
   public static final String INPUT_TABLE_NAME_PROPERTY = 
-      "mapred.jdbc.input.table.name";
+    "mapreduce.jdbc.input.table.name";
 
   /** Field names in the Input table */
   public static final String INPUT_FIELD_NAMES_PROPERTY = 
-      "mapred.jdbc.input.field.names";
+    "mapreduce.jdbc.input.field.names";
 
   /** WHERE clause in the input SELECT statement */
   public static final String INPUT_CONDITIONS_PROPERTY = 
-      "mapred.jdbc.input.conditions";
+    "mapreduce.jdbc.input.conditions";
   
   /** ORDER BY clause in the input SELECT statement */
   public static final String INPUT_ORDER_BY_PROPERTY = 
-      "mapred.jdbc.input.orderby";
+    "mapreduce.jdbc.input.orderby";
   
   /** Whole input query, exluding LIMIT...OFFSET */
-  public static final String INPUT_QUERY = "mapred.jdbc.input.query";
+  public static final String INPUT_QUERY = "mapreduce.jdbc.input.query";
   
   /** Input query to get the count of records */
   public static final String INPUT_COUNT_QUERY = 
-      "mapred.jdbc.input.count.query";
+    "mapreduce.jdbc.input.count.query";
+  
+  /** Input query to get the max and min values of the jdbc.input.query */
+  public static final String INPUT_BOUNDING_QUERY =
+      "mapred.jdbc.input.bounding.query";
   
   /** Class name implementing DBWritable which will hold input tuples */
-  public static final String INPUT_CLASS_PROPERTY = "mapred.jdbc.input.class";
+  public static final String INPUT_CLASS_PROPERTY = 
+    "mapreduce.jdbc.input.class";
 
   /** Output table name */
   public static final String OUTPUT_TABLE_NAME_PROPERTY = 
-      "mapred.jdbc.output.table.name";
+    "mapreduce.jdbc.output.table.name";
 
   /** Field names in the Output table */
   public static final String OUTPUT_FIELD_NAMES_PROPERTY = 
-      "mapred.jdbc.output.field.names";  
+    "mapreduce.jdbc.output.field.names";  
 
   /** Number of fields in the Output table */
   public static final String OUTPUT_FIELD_COUNT_PROPERTY = 
-      "mapred.jdbc.output.field.count";  
+    "mapreduce.jdbc.output.field.count";  
   
   /**
    * Sets the DB access related fields in the {@link Configuration}.  
@@ -207,7 +212,17 @@
       conf.set(DBConfiguration.INPUT_COUNT_QUERY, query);
     }
   }
-  
+
+  public void setInputBoundingQuery(String query) {
+    if (query != null && query.length() > 0) {
+      conf.set(DBConfiguration.INPUT_BOUNDING_QUERY, query);
+    }
+  }
+
+  public String getInputBoundingQuery() {
+    return conf.get(DBConfiguration.INPUT_BOUNDING_QUERY);
+  }
+
   public Class<?> getInputClass() {
     return conf.getClass(DBConfiguration.INPUT_CLASS_PROPERTY,
                          NullDBWritable.class);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java Sat Nov 28 20:26:01 2009
@@ -172,7 +172,15 @@
   public DBConfiguration getDBConf() {
     return dbConf;
   }
-  
+
+  public Connection getConnection() {
+    return connection;
+  }
+
+  public String getDBProductName() {
+    return dbProductName;
+  }
+
   protected RecordReader<LongWritable, T> createDBRecordReader(DBInputSplit split,
       Configuration conf) throws IOException {
 
@@ -218,7 +226,7 @@
       results.next();
 
       long count = results.getLong(1);
-      int chunks = job.getConfiguration().getInt("mapred.map.tasks", 1);
+      int chunks = job.getConfiguration().getInt(JobContext.NUM_MAPS, 1);
       long chunkSize = (count / chunks);
 
       results.close();

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java Sat Nov 28 20:26:01 2009
@@ -30,6 +30,8 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -49,7 +51,10 @@
  */
 public class DBRecordReader<T extends DBWritable> extends
     RecordReader<LongWritable, T> {
-  private ResultSet results;
+
+  private static final Log LOG = LogFactory.getLog(DBRecordReader.class);
+
+  private ResultSet results = null;
 
   private Class<T> inputClass;
 
@@ -65,7 +70,7 @@
 
   private Connection connection;
 
-  private PreparedStatement statement;
+  protected PreparedStatement statement;
 
   private DBConfiguration dbConf;
 
@@ -91,8 +96,6 @@
     this.conditions = cond;
     this.fieldNames = fields;
     this.tableName = table;
-    
-    this.results = executeQuery(getSelectQuery());
   }
 
   protected ResultSet executeQuery(String query) throws SQLException {
@@ -104,7 +107,7 @@
   /** Returns the query for selecting the records, 
    * subclasses can override this for custom behaviour.*/
   protected String getSelectQuery() {
-      StringBuilder query = new StringBuilder();
+    StringBuilder query = new StringBuilder();
 
     // Default codepath for MySQL, HSQLDB, etc. Relies on LIMIT/OFFSET for splits.
     if(dbConf.getInputQuery() == null) {
@@ -214,6 +217,10 @@
       if (value == null) {
         value = createValue();
       }
+      if (null == this.results) {
+        // First time into this method, run the query.
+        this.results = executeQuery(getSelectQuery());
+      }
       if (!results.next())
         return false;
 
@@ -252,4 +259,12 @@
   protected Connection getConnection() {
     return connection;
   }
+
+  protected PreparedStatement getStatement() {
+    return statement;
+  }
+
+  protected void setStatement(PreparedStatement stmt) {
+    this.statement = stmt;
+  }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java Sat Nov 28 20:26:01 2009
@@ -38,7 +38,7 @@
 
   // Execute statements for mysql in unbuffered mode.
   protected ResultSet executeQuery(String query) throws SQLException {
-    PreparedStatement statement = getConnection().prepareStatement(query,
+    statement = getConnection().prepareStatement(query,
       ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
     statement.setFetchSize(Integer.MIN_VALUE); // MySQL: read row-at-a-time.
     return statement.executeQuery();

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java Sat Nov 28 20:26:01 2009
@@ -32,9 +32,9 @@
  * fields are from the value only. Otherwise, the fields are the union of those
  * from the key and those from the value.
  * 
- * The field separator is under attribute "mapred.data.field.separator"
+ * The field separator is under attribute "mapreduce.fieldsel.data.field.separator"
  * 
- * The map output field list spec is under attribute "map.output.key.value.fields.spec".
+ * The map output field list spec is under attribute "mapreduce.fieldsel.mapreduce.fieldsel.map.output.key.value.fields.spec".
  * The value is expected to be like "keyFieldsSpec:valueFieldsSpec"
  * key/valueFieldsSpec are comma (,) separated field spec: fieldSpec,fieldSpec,fieldSpec ...
  * Each field spec can be a simple number (e.g. 5) specifying a specific field, or a range
@@ -45,7 +45,7 @@
  * Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields 4,3,0 and 1 for keys,
  * and use fields 6,5,1,2,3,7 and above for values.
  * 
- * The reduce output field list spec is under attribute "reduce.output.key.value.fields.spec".
+ * The reduce output field list spec is under attribute "mapreduce.fieldsel.mapreduce.fieldsel.reduce.output.key.value.fields.spec".
  * 
  * The reducer extracts output key/value pairs in a similar manner, except that
  * the key is never ignored.
@@ -54,6 +54,13 @@
 public class FieldSelectionHelper {
 
   public static Text emptyText = new Text("");
+  public static final String DATA_FIELD_SEPERATOR = 
+    "mapreduce.fieldsel.data.field.separator";
+  public static final String MAP_OUTPUT_KEY_VALUE_SPEC = 
+    "mapreduce.fieldsel.mapreduce.fieldsel.mapreduce.fieldsel.map.output.key.value.fields.spec";
+  public static final String REDUCE_OUTPUT_KEY_VALUE_SPEC = 
+    "mapreduce.fieldsel.mapreduce.fieldsel.mapreduce.fieldsel.reduce.output.key.value.fields.spec";
+
 
   /**
    * Extract the actual field numbers from the given field specs.

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java Sat Nov 28 20:26:01 2009
@@ -39,10 +39,10 @@
  * fields are from the value only. Otherwise, the fields are the union of those
  * from the key and those from the value.
  * 
- * The field separator is under attribute "mapred.data.field.separator"
+ * The field separator is under attribute "mapreduce.fieldsel.data.field.separator"
  * 
  * The map output field list spec is under attribute 
- * "map.output.key.value.fields.spec". The value is expected to be like
+ * "mapreduce.fieldsel.mapreduce.fieldsel.map.output.key.value.fields.spec". The value is expected to be like
  * "keyFieldsSpec:valueFieldsSpec" key/valueFieldsSpec are comma (,) separated
  * field spec: fieldSpec,fieldSpec,fieldSpec ... Each field spec can be a 
  * simple number (e.g. 5) specifying a specific field, or a range (like 2-5)
@@ -73,9 +73,10 @@
   public void setup(Context context) 
       throws IOException, InterruptedException {
     Configuration conf = context.getConfiguration();
-    this.fieldSeparator = conf.get("mapred.data.field.separator", "\t");
+    this.fieldSeparator = 
+      conf.get(FieldSelectionHelper.DATA_FIELD_SEPERATOR, "\t");
     this.mapOutputKeyValueSpec = 
-      conf.get("map.output.key.value.fields.spec", "0-:");
+      conf.get(FieldSelectionHelper.MAP_OUTPUT_KEY_VALUE_SPEC, "0-:");
     try {
       this.ignoreInputKey = TextInputFormat.class.getCanonicalName().equals(
         context.getInputFormatClass().getCanonicalName());

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java Sat Nov 28 20:26:01 2009
@@ -38,10 +38,10 @@
  * the reduce output values. The fields are the union of those from the key
  * and those from the value.
  * 
- * The field separator is under attribute "mapred.data.field.separator"
+ * The field separator is under attribute "mapreduce.fieldsel.data.field.separator"
  * 
  * The reduce output field list spec is under attribute 
- * "reduce.output.key.value.fields.spec". The value is expected to be like
+ * "mapreduce.fieldsel.mapreduce.fieldsel.reduce.output.key.value.fields.spec". The value is expected to be like
  * "keyFieldsSpec:valueFieldsSpec" key/valueFieldsSpec are comma (,) 
  * separated field spec: fieldSpec,fieldSpec,fieldSpec ... Each field spec
  * can be a simple number (e.g. 5) specifying a specific field, or a range
@@ -71,10 +71,11 @@
       throws IOException, InterruptedException {
     Configuration conf = context.getConfiguration();
     
-    this.fieldSeparator = conf.get("mapred.data.field.separator", "\t");
+    this.fieldSeparator = 
+      conf.get(FieldSelectionHelper.DATA_FIELD_SEPERATOR, "\t");
     
     this.reduceOutputKeyValueSpec = 
-      conf.get("reduce.output.key.value.fields.spec", "0-:");
+      conf.get(FieldSelectionHelper.REDUCE_OUTPUT_KEY_VALUE_SPEC, "0-:");
     
     allReduceValueFieldsFrom = FieldSelectionHelper.parseOutputKeyValueSpec(
       reduceOutputKeyValueSpec, reduceOutputKeyFieldList,

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java Sat Nov 28 20:26:01 2009
@@ -68,6 +68,10 @@
 public abstract class CombineFileInputFormat<K, V>
   extends FileInputFormat<K, V> {
 
+  public static final String SPLIT_MINSIZE_PERNODE = 
+    "mapreduce.input.fileinputformat.split.minsize.per.node";
+  public static final String SPLIT_MINSIZE_PERRACK = 
+    "mapreduce.input.fileinputformat.split.minsize.per.rack";
   // ability to limit the size of a single split
   private long maxSplitSize = 0;
   private long minSplitSizeNode = 0;
@@ -151,17 +155,17 @@
     if (minSplitSizeNode != 0) {
       minSizeNode = minSplitSizeNode;
     } else {
-      minSizeNode = conf.getLong("mapred.min.split.size.per.node", 0);
+      minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
     }
     if (minSplitSizeRack != 0) {
       minSizeRack = minSplitSizeRack;
     } else {
-      minSizeRack = conf.getLong("mapred.min.split.size.per.rack", 0);
+      minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
     }
     if (maxSplitSize != 0) {
       maxSize = maxSplitSize;
     } else {
-      maxSize = conf.getLong("mapred.max.split.size", 0);
+      maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
     }
     if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
       throw new IOException("Minimum split size pernode " + minSizeNode +

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReader.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReader.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReader.java Sat Nov 28 20:26:01 2009
@@ -140,9 +140,9 @@
 
       Configuration conf = context.getConfiguration();
       // setup some helper config variables.
-      conf.set("map.input.file", split.getPath(idx).toString());
-      conf.setLong("map.input.start", split.getOffset(idx));
-      conf.setLong("map.input.length", split.getLength(idx));
+      conf.set(JobContext.MAP_INPUT_FILE, split.getPath(idx).toString());
+      conf.setLong(JobContext.MAP_INPUT_START, split.getOffset(idx));
+      conf.setLong(JobContext.MAP_INPUT_PATH, split.getLength(idx));
     } catch (Exception e) {
       throw new RuntimeException (e);
     }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingInputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingInputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingInputFormat.java Sat Nov 28 20:26:01 2009
@@ -119,17 +119,9 @@
     return splits;
   }
 
-  @SuppressWarnings("unchecked")
+  @Override
   public RecordReader<K, V> createRecordReader(InputSplit split,
       TaskAttemptContext context) throws IOException, InterruptedException {
-
-    // Find the InputFormat and then the RecordReader from the
-    // TaggedInputSplit.
-    TaggedInputSplit taggedInputSplit = (TaggedInputSplit) split;
-    InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils
-      .newInstance(taggedInputSplit.getInputFormatClass(),
-         context.getConfiguration());
-    return inputFormat.createRecordReader(taggedInputSplit.getInputSplit(),
-      context);
+    return new DelegatingRecordReader<K, V>(split, context);
   }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Sat Nov 28 20:26:01 2009
@@ -52,6 +52,14 @@
   public static final String COUNTER_GROUP = 
                                 "FileInputFormatCounters";
   public static final String BYTES_READ = "BYTES_READ";
+  public static final String INPUT_DIR = 
+    "mapreduce.input.fileinputformat.inputdir";
+  public static final String SPLIT_MAXSIZE = 
+    "mapreduce.input.fileinputformat.split.maxsize";
+  public static final String SPLIT_MINSIZE = 
+    "mapreduce.input.fileinputformat.split.minsize";
+  public static final String PATHFILTER_CLASS = 
+    "mapreduce.input.pathFilter.class";
 
   private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
 
@@ -117,7 +125,7 @@
    */
   public static void setInputPathFilter(Job job,
                                         Class<? extends PathFilter> filter) {
-    job.getConfiguration().setClass("mapred.input.pathFilter.class", filter, 
+    job.getConfiguration().setClass(PATHFILTER_CLASS, filter, 
                                     PathFilter.class);
   }
 
@@ -128,7 +136,7 @@
    */
   public static void setMinInputSplitSize(Job job,
                                           long size) {
-    job.getConfiguration().setLong("mapred.min.split.size", size);
+    job.getConfiguration().setLong(SPLIT_MINSIZE, size);
   }
 
   /**
@@ -137,7 +145,7 @@
    * @return the minimum number of bytes that can be in a split
    */
   public static long getMinSplitSize(JobContext job) {
-    return job.getConfiguration().getLong("mapred.min.split.size", 1L);
+    return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
   }
 
   /**
@@ -147,7 +155,7 @@
    */
   public static void setMaxInputSplitSize(Job job,
                                           long size) {
-    job.getConfiguration().setLong("mapred.max.split.size", size);
+    job.getConfiguration().setLong(SPLIT_MAXSIZE, size);
   }
 
   /**
@@ -156,7 +164,7 @@
    * @return the maximum number of bytes a split can include
    */
   public static long getMaxSplitSize(JobContext context) {
-    return context.getConfiguration().getLong("mapred.max.split.size", 
+    return context.getConfiguration().getLong(SPLIT_MAXSIZE, 
                                               Long.MAX_VALUE);
   }
 
@@ -167,7 +175,7 @@
    */
   public static PathFilter getInputPathFilter(JobContext context) {
     Configuration conf = context.getConfiguration();
-    Class<?> filterClass = conf.getClass("mapred.input.pathFilter.class", null,
+    Class<?> filterClass = conf.getClass(PATHFILTER_CLASS, null,
         PathFilter.class);
     return (filterClass != null) ?
         (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;
@@ -344,7 +352,7 @@
       path = inputPaths[i].makeQualified(fs);
       str.append(StringUtils.escapeString(path.toString()));
     }
-    conf.set("mapred.input.dir", str.toString());
+    conf.set(INPUT_DIR, str.toString());
   }
 
   /**
@@ -360,8 +368,8 @@
     FileSystem fs = FileSystem.get(conf);
     path = path.makeQualified(fs);
     String dirStr = StringUtils.escapeString(path.toString());
-    String dirs = conf.get("mapred.input.dir");
-    conf.set("mapred.input.dir", dirs == null ? dirStr : dirs + "," + dirStr);
+    String dirs = conf.get(INPUT_DIR);
+    conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
   }
   
   // This method escapes commas in the glob pattern of the given paths.
@@ -410,7 +418,7 @@
    * @return the list of input {@link Path}s for the map-reduce job.
    */
   public static Path[] getInputPaths(JobContext context) {
-    String dirs = context.getConfiguration().get("mapred.input.dir", "");
+    String dirs = context.getConfiguration().get(INPUT_DIR, "");
     String [] list = StringUtils.split(dirs);
     Path[] result = new Path[list.length];
     for (int i = 0; i < list.length; i++) {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java Sat Nov 28 20:26:01 2009
@@ -29,10 +29,12 @@
 /**
  * This class treats a line in the input as a key/value pair separated by a 
  * separator character. The separator can be specified in config file 
- * under the attribute name key.value.separator.in.input.line. The default
+ * under the attribute name mapreduce.input.keyvaluelinerecordreader.key.value.separator. The default
  * separator is the tab character ('\t').
  */
 public class KeyValueLineRecordReader extends RecordReader<Text, Text> {
+  public static final String KEY_VALUE_SEPERATOR = 
+    "mapreduce.input.keyvaluelinerecordreader.key.value.separator";
   
   private final LineRecordReader lineRecordReader;
 
@@ -44,13 +46,11 @@
   
   private Text value;
   
-  public Class<?> getKeyClass() { return Text.class; }
-  
   public KeyValueLineRecordReader(Configuration conf)
     throws IOException {
     
     lineRecordReader = new LineRecordReader();
-    String sepStr = conf.get("key.value.separator.in.input.line", "\t");
+    String sepStr = conf.get(KEY_VALUE_SEPERATOR, "\t");
     this.separator = (byte) sepStr.charAt(0);
   }
 
@@ -75,14 +75,8 @@
       key.set(line, 0, lineLen);
       value.set("");
     } else {
-      int keyLen = pos;
-      byte[] keyBytes = new byte[keyLen];
-      System.arraycopy(line, 0, keyBytes, 0, keyLen);
-      int valLen = lineLen - keyLen - 1;
-      byte[] valBytes = new byte[valLen];
-      System.arraycopy(line, pos + 1, valBytes, 0, valLen);
-      key.set(keyBytes);
-      value.set(valBytes);
+      key.set(line, 0, pos);
+      value.set(line, pos + 1, lineLen - pos - 1);
     }
   }
   /** Read key/value pair in a line. */

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java Sat Nov 28 20:26:01 2009
@@ -24,6 +24,7 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -40,9 +41,12 @@
 
   @Override
   protected boolean isSplitable(JobContext context, Path file) {
-    CompressionCodec codec = 
+    final CompressionCodec codec =
       new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
-    return codec == null;
+    if (null == codec) {
+      return true;
+    }
+    return codec instanceof SplittableCompressionCodec;
   }
 
   public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit,

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java Sat Nov 28 20:26:01 2009
@@ -24,10 +24,13 @@
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.SplitCompressionInputStream;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.mapreduce.Counter;
@@ -38,12 +41,15 @@
 import org.apache.hadoop.util.LineReader;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.Log;
+import org.apache.hadoop.fs.Seekable;
 
 /**
  * Treats keys as offset in file and value as line. 
  */
 public class LineRecordReader extends RecordReader<LongWritable, Text> {
   private static final Log LOG = LogFactory.getLog(LineRecordReader.class);
+  public static final String MAX_LINE_LENGTH = 
+    "mapreduce.input.linerecordreader.line.maxlength";
 
   private CompressionCodecFactory compressionCodecs = null;
   private long start;
@@ -51,6 +57,7 @@
   private long end;
   private LineReader in;
   private FSDataInputStream fileIn;
+  private Seekable filePosition;
   private int maxLineLength;
   private LongWritable key = null;
   private Text value = null;
@@ -64,8 +71,7 @@
     inputByteCounter = ((MapContext)context).getCounter(
       FileInputFormat.COUNTER_GROUP, FileInputFormat.BYTES_READ);
     Configuration job = context.getConfiguration();
-    this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
-                                    Integer.MAX_VALUE);
+    this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
     start = split.getStart();
     end = start + split.getLength();
     final Path file = split.getPath();
@@ -73,40 +79,57 @@
     codec = compressionCodecs.getCodec(file);
 
     // open the file and seek to the start of the split
-    FileSystem fs = file.getFileSystem(job);
-    fileIn = fs.open(split.getPath());
+    final FileSystem fs = file.getFileSystem(job);
+    fileIn = fs.open(file);
     if (isCompressedInput()) {
       decompressor = CodecPool.getDecompressor(codec);
-      in = new LineReader(codec.createInputStream(fileIn, decompressor), job);
+      if (codec instanceof SplittableCompressionCodec) {
+        final SplitCompressionInputStream cIn =
+          ((SplittableCompressionCodec)codec).createInputStream(
+            fileIn, decompressor, start, end,
+            SplittableCompressionCodec.READ_MODE.BYBLOCK);
+        in = new LineReader(cIn, job);
+        start = cIn.getAdjustedStart();
+        end = cIn.getAdjustedEnd();
+        filePosition = cIn;
+      } else {
+        in = new LineReader(codec.createInputStream(fileIn, decompressor), job);
+        filePosition = fileIn;
+      }
     } else {
       fileIn.seek(start);
       in = new LineReader(fileIn, job);
+      filePosition = fileIn;
     }
     // If this is not the first split, we always throw away first record
     // because we always (except the last split) read one extra line in
     // next() method.
     if (start != 0) {
-      start += in.readLine(new Text(), 0, maxBytesToConsume());
+      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
     }
     this.pos = start;
   }
   
-  private boolean isCompressedInput() { return (codec != null); }
-  
-  private int maxBytesToConsume() {
-    return (isCompressedInput()) ? Integer.MAX_VALUE
-                           : (int) Math.min(Integer.MAX_VALUE, (end - start));
+  private boolean isCompressedInput() {
+    return (codec != null);
   }
-  
+
+  private int maxBytesToConsume(long pos) {
+    return isCompressedInput()
+      ? Integer.MAX_VALUE
+      : (int) Math.min(Integer.MAX_VALUE, end - pos);
+  }
+
   private long getFilePosition() throws IOException {
     long retVal;
-    if (isCompressedInput()) {
-      retVal = fileIn.getPos();
+    if (isCompressedInput() && null != filePosition) {
+      retVal = filePosition.getPos();
     } else {
       retVal = pos;
     }
     return retVal;
   }
+
   public boolean nextKeyValue() throws IOException {
     if (key == null) {
       key = new LongWritable();
@@ -120,7 +143,7 @@
     // split limit i.e. (end - 1)
     while (getFilePosition() <= end) {
       newSize = in.readLine(value, maxLineLength,
-                            Math.max(maxBytesToConsume(), maxLineLength));
+          Math.max(maxBytesToConsume(pos), maxLineLength));
       if (newSize == 0) {
         break;
       }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/MultipleInputs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/MultipleInputs.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/MultipleInputs.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/MultipleInputs.java Sat Nov 28 20:26:01 2009
@@ -34,6 +34,11 @@
  * a different {@link InputFormat} and {@link Mapper} for each path 
  */
 public class MultipleInputs {
+  public static final String DIR_FORMATS = 
+    "mapreduce.input.multipleinputs.dir.formats";
+  public static final String DIR_MAPPERS = 
+    "mapreduce.input.multipleinputs.dir.mappers";
+  
   /**
    * Add a {@link Path} with a custom {@link InputFormat} to the list of
    * inputs for the map-reduce job.
@@ -48,8 +53,8 @@
     String inputFormatMapping = path.toString() + ";"
        + inputFormatClass.getName();
     Configuration conf = job.getConfiguration();
-    String inputFormats = conf.get("mapred.input.dir.formats");
-    conf.set("mapred.input.dir.formats",
+    String inputFormats = conf.get(DIR_FORMATS);
+    conf.set(DIR_FORMATS,
        inputFormats == null ? inputFormatMapping : inputFormats + ","
            + inputFormatMapping);
 
@@ -73,8 +78,8 @@
     addInputPath(job, path, inputFormatClass);
     Configuration conf = job.getConfiguration();
     String mapperMapping = path.toString() + ";" + mapperClass.getName();
-    String mappers = conf.get("mapred.input.dir.mappers");
-    conf.set("mapred.input.dir.mappers", mappers == null ? mapperMapping
+    String mappers = conf.get(DIR_MAPPERS);
+    conf.set(DIR_MAPPERS, mappers == null ? mapperMapping
        : mappers + "," + mapperMapping);
 
     job.setMapperClass(DelegatingMapper.class);
@@ -92,7 +97,7 @@
   static Map<Path, InputFormat> getInputFormatMap(JobContext job) {
     Map<Path, InputFormat> m = new HashMap<Path, InputFormat>();
     Configuration conf = job.getConfiguration();
-    String[] pathMappings = conf.get("mapred.input.dir.formats").split(",");
+    String[] pathMappings = conf.get(DIR_FORMATS).split(",");
     for (String pathMapping : pathMappings) {
       String[] split = pathMapping.split(";");
       InputFormat inputFormat;
@@ -119,12 +124,12 @@
   static Map<Path, Class<? extends Mapper>> 
       getMapperTypeMap(JobContext job) {
     Configuration conf = job.getConfiguration();
-    if (conf.get("mapred.input.dir.mappers") == null) {
+    if (conf.get(DIR_MAPPERS) == null) {
       return Collections.emptyMap();
     }
     Map<Path, Class<? extends Mapper>> m = 
       new HashMap<Path, Class<? extends Mapper>>();
-    String[] pathMappings = conf.get("mapred.input.dir.mappers").split(",");
+    String[] pathMappings = conf.get(DIR_MAPPERS).split(",");
     for (String pathMapping : pathMappings) {
       String[] split = pathMapping.split(";");
       Class<? extends Mapper> mapClass;

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java Sat Nov 28 20:26:01 2009
@@ -56,6 +56,8 @@
  */
 
 public class NLineInputFormat extends FileInputFormat<LongWritable, Text> { 
+  public static final String LINES_PER_MAP = 
+    "mapreduce.input.lineinputformat.linespermap";
 
   public RecordReader<LongWritable, Text> createRecordReader(
       InputSplit genericSplit, TaskAttemptContext context) 
@@ -136,8 +138,7 @@
    * @param numLines the number of lines per split
    */
   public static void setNumLinesPerSplit(Job job, int numLines) {
-    job.getConfiguration().setInt(
-      "mapred.line.input.format.linespermap", numLines);
+    job.getConfiguration().setInt(LINES_PER_MAP, numLines);
   }
 
   /**
@@ -146,7 +147,6 @@
    * @return the number of lines per split
    */
   public static int getNumLinesPerSplit(JobContext job) {
-    return job.getConfiguration().getInt(
-      "mapred.line.input.format.linespermap", 1);
+    return job.getConfiguration().getInt(LINES_PER_MAP, 1);
   }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFilter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFilter.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFilter.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFilter.java Sat Nov 28 20:26:01 2009
@@ -46,10 +46,12 @@
     extends SequenceFileInputFormat<K, V> {
   public static final Log LOG = LogFactory.getLog(FileInputFormat.class);
   
-  final public static String FILTER_CLASS = "sequencefile.filter.class";
-  final private static String FILTER_FREQUENCY
-    = "sequencefile.filter.frequency";
-  final private static String FILTER_REGEX = "sequencefile.filter.regex";
+  final public static String FILTER_CLASS = 
+    "mapreduce.input.mapreduce.input.mapreduce.input.sequencefileinputfilter.class";
+  final public static String FILTER_FREQUENCY = 
+    "mapreduce.input.mapreduce.input.mapreduce.input.sequencefileinputfilter.frequency";
+  final public static String FILTER_REGEX = 
+    "mapreduce.input.mapreduce.input.mapreduce.input.sequencefileinputfilter.regex";
     
   public SequenceFileInputFilter() {
   }
@@ -166,7 +168,7 @@
      * @param conf configuration
      */
     public void setConf(Configuration conf) {
-      this.frequency = conf.getInt("sequencefile.filter.frequency", 10);
+      this.frequency = conf.getInt(FILTER_FREQUENCY, 10);
       if (this.frequency <= 0) {
         throw new RuntimeException(
           "Negative "+FILTER_FREQUENCY + ": " + this.frequency);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java Sat Nov 28 20:26:01 2009
@@ -23,6 +23,7 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -43,9 +44,12 @@
 
   @Override
   protected boolean isSplitable(JobContext context, Path file) {
-    CompressionCodec codec = 
+    final CompressionCodec codec =
       new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
-    return codec == null;
+    if (null == codec) {
+      return true;
+    }
+    return codec instanceof SplittableCompressionCodec;
   }
 
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/jobcontrol/ControlledJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/jobcontrol/ControlledJob.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/jobcontrol/ControlledJob.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/jobcontrol/ControlledJob.java Sat Nov 28 20:26:01 2009
@@ -48,7 +48,7 @@
   // A job will be in one of the following states
   public static enum State {SUCCESS, WAITING, RUNNING, READY, FAILED,
                             DEPENDENT_FAILED}; 
-	
+  public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist";
   private State state;
   private String controlID;     // assigned and used by JobControl class
   private Job job;               // mapreduce job to be executed.
@@ -228,7 +228,7 @@
     return this.state == State.READY;
   }
 
-  public void killJob() throws IOException {
+  public void killJob() throws IOException, InterruptedException {
     job.killJob();
   }
   
@@ -236,7 +236,7 @@
    * Check the state of this running job. The state may 
    * remain the same, become SUCCESS or FAILED.
    */
-  private void checkRunningState() {
+  private void checkRunningState() throws IOException, InterruptedException {
     try {
       if (job.isComplete()) {
         if (job.isSuccessful()) {
@@ -261,7 +261,7 @@
    * Check and update the state of this job. The state changes  
    * depending on its current state and the states of the depending jobs.
    */
-   synchronized State checkState() {
+   synchronized State checkState() throws IOException, InterruptedException {
     if (this.state == State.RUNNING) {
       checkRunningState();
     }
@@ -303,7 +303,7 @@
   protected synchronized void submit() {
     try {
       Configuration conf = job.getConfiguration();
-      if (conf.getBoolean("create.empty.dir.if.nonexist", false)) {
+      if (conf.getBoolean(CREATE_DIR, false)) {
         FileSystem fs = FileSystem.get(conf);
         Path inputPaths[] = FileInputFormat.getInputPaths(job);
         for (int i = 0; i < inputPaths.length; i++) {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java Sat Nov 28 20:26:01 2009
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapreduce.lib.jobcontrol;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Hashtable;
@@ -206,7 +207,8 @@
     }
   }
 	
-  synchronized private void checkRunningJobs() {
+  synchronized private void checkRunningJobs() 
+      throws IOException, InterruptedException {
 		
     Map<String, ControlledJob> oldJobs = null;
     oldJobs = this.runningJobs;
@@ -218,7 +220,8 @@
     }
   }
 	
-  synchronized private void checkWaitingJobs() {
+  synchronized private void checkWaitingJobs() 
+      throws IOException, InterruptedException {
     Map<String, ControlledJob> oldJobs = null;
     oldJobs = this.waitingJobs;
     this.waitingJobs = new Hashtable<String, ControlledJob>();
@@ -265,9 +268,13 @@
 					
         }
       }
-      checkRunningJobs();	
-      checkWaitingJobs();		
-      startReadyJobs();		
+      try {
+        checkRunningJobs();	
+        checkWaitingJobs();
+        startReadyJobs();
+      } catch (Exception e) {
+  	    this.runnerState = ThreadState.STOPPED;
+      }
       if (this.runnerState != ThreadState.RUNNING && 
           this.runnerState != ThreadState.SUSPENDED) {
         break;

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputFormat.java Sat Nov 28 20:26:01 2009
@@ -121,7 +121,7 @@
   public List<InputSplit> getSplits(JobContext job) 
       throws IOException, InterruptedException {
     setFormat(job.getConfiguration());
-    job.getConfiguration().setLong("mapred.min.split.size", Long.MAX_VALUE);
+    job.getConfiguration().setLong("mapreduce.input.fileinputformat.split.minsize", Long.MAX_VALUE);
     return root.getSplits(job);
   }
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeRecordReader.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeRecordReader.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeRecordReader.java Sat Nov 28 20:26:01 2009
@@ -1,4 +1,5 @@
-/** * Licensed to the Apache Software Foundation (ASF) under one
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/join/Parser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/join/Parser.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/join/Parser.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/join/Parser.java Sat Nov 28 20:26:01 2009
@@ -42,6 +42,8 @@
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /**
@@ -309,8 +311,9 @@
     
     public List<InputSplit> getSplits(JobContext context)
         throws IOException, InterruptedException {
-      return inf.getSplits(new JobContext(
-        getConf(context.getConfiguration()), context.getJobID()));
+      return inf.getSplits(
+                 new JobContextImpl(getConf(context.getConfiguration()), 
+                                    context.getJobID()));
     }
 
     public ComposableRecordReader<?, ?> createRecordReader(InputSplit split, 
@@ -321,8 +324,10 @@
           throw new IOException("No RecordReader for " + ident);
         }
         Configuration conf = getConf(taskContext.getConfiguration());
-        TaskAttemptContext context = new TaskAttemptContext(conf, 
-          TaskAttemptID.forName(conf.get("mapred.task.id")));
+        TaskAttemptContext context = 
+          new TaskAttemptContextImpl(conf, 
+                                     TaskAttemptID.forName(
+                                         conf.get(JobContext.TASK_ATTEMPT_ID)));
         return rrCstrMap.get(ident).newInstance(id,
             inf.createRecordReader(split, context), cmpcl);
       } catch (IllegalAccessException e) {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java Sat Nov 28 20:26:01 2009
@@ -24,11 +24,13 @@
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MapContext;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.StatusReporter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -56,6 +58,9 @@
   extends Mapper<K1, V1, K2, V2> {
 
   private static final Log LOG = LogFactory.getLog(MultithreadedMapper.class);
+  public static String NUM_THREADS = "mapreduce.mapper.multithreadedmapper.threads";
+  public static String MAP_CLASS = "mapreduce.mapper.multithreadedmapper.mapclass";
+  
   private Class<? extends Mapper<K1,V1,K2,V2>> mapClass;
   private Context outer;
   private List<MapRunner> runners;
@@ -66,8 +71,7 @@
    * @return the number of threads
    */
   public static int getNumberOfThreads(JobContext job) {
-    return job.getConfiguration().
-            getInt("mapred.map.multithreadedrunner.threads", 10);
+    return job.getConfiguration().getInt(NUM_THREADS, 10);
   }
 
   /**
@@ -76,8 +80,7 @@
    * @param threads the new number of threads
    */
   public static void setNumberOfThreads(Job job, int threads) {
-    job.getConfiguration().setInt("mapred.map.multithreadedrunner.threads", 
-                                  threads);
+    job.getConfiguration().setInt(NUM_THREADS, threads);
   }
 
   /**
@@ -93,8 +96,7 @@
   public static <K1,V1,K2,V2>
   Class<Mapper<K1,V1,K2,V2>> getMapperClass(JobContext job) {
     return (Class<Mapper<K1,V1,K2,V2>>) 
-         job.getConfiguration().getClass("mapred.map.multithreadedrunner.class",
-                                         Mapper.class);
+      job.getConfiguration().getClass(MAP_CLASS, Mapper.class);
   }
   
   /**
@@ -113,8 +115,7 @@
       throw new IllegalArgumentException("Can't have recursive " + 
                                          "MultithreadedMapper instances.");
     }
-    job.getConfiguration().setClass("mapred.map.multithreadedrunner.class",
-                                    cls, Mapper.class);
+    job.getConfiguration().setClass(MAP_CLASS, cls, Mapper.class);
   }
 
   /**
@@ -245,13 +246,15 @@
     MapRunner(Context context) throws IOException, InterruptedException {
       mapper = ReflectionUtils.newInstance(mapClass, 
                                            context.getConfiguration());
-      subcontext = new Context(outer.getConfiguration(), 
-                            outer.getTaskAttemptID(),
-                            new SubMapRecordReader(),
-                            new SubMapRecordWriter(), 
-                            context.getOutputCommitter(),
-                            new SubMapStatusReporter(),
-                            outer.getInputSplit());
+      MapContext<K1, V1, K2, V2> mapContext = 
+        new MapContextImpl<K1, V1, K2, V2>(outer.getConfiguration(), 
+                                           outer.getTaskAttemptID(),
+                                           new SubMapRecordReader(),
+                                           new SubMapRecordWriter(), 
+                                           context.getOutputCommitter(),
+                                           new SubMapStatusReporter(),
+                                           outer.getInputSplit());
+      subcontext = new WrappedMapper<K1, V1, K2, V2>().getMapContext(mapContext);
     }
 
     public Throwable getThrowable() {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/map/RegexMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/map/RegexMapper.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/map/RegexMapper.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/map/RegexMapper.java Sat Nov 28 20:26:01 2009
@@ -31,13 +31,15 @@
 /** A {@link Mapper} that extracts text matching a regular expression. */
 public class RegexMapper<K> extends Mapper<K, Text, Text, LongWritable> {
 
+  public static String PATTERN = "mapreduce.mapper.regex";
+  public static String GROUP = "mapreduce.mapper.regexmapper..group";
   private Pattern pattern;
   private int group;
 
   public void setup(Context context) {
     Configuration conf = context.getConfiguration();
-    pattern = Pattern.compile(conf.get("mapred.mapper.regex"));
-    group = conf.getInt("mapred.mapper.regex.group", 0);
+    pattern = Pattern.compile(conf.get(PATTERN));
+    group = conf.getInt(GROUP, 0);
   }
 
   public void map(K key, Text value,



Mime
View raw message