hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1485845 [2/4] - in /hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common: ./ src/main/bin/ src/main/conf/ src/main/docs/src/documentation/content/xdocs/ src/main/java/ src/main/java/org/apache/hadoop/fs/ src/main/java/org/ap...
Date Thu, 23 May 2013 20:41:57 GMT
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java?rev=1485845&r1=1485844&r2=1485845&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java Thu May 23 20:41:53 2013
@@ -19,7 +19,10 @@ package org.apache.hadoop.io.nativeio;
 
 import java.io.File;
 import java.io.FileDescriptor;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -27,10 +30,13 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.io.SecureIOUtils.AlreadyExistsException;
 import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.hadoop.util.Shell;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
 /**
  * JNI wrappers for various native IO-related calls not available in Java.
  * These functions should generally be used alongside a fallback to another
@@ -39,81 +45,341 @@ import org.apache.commons.logging.LogFac
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class NativeIO {
-  // Flags for open() call from bits/fcntl.h
-  public static final int O_RDONLY   =    00;
-  public static final int O_WRONLY   =    01;
-  public static final int O_RDWR     =    02;
-  public static final int O_CREAT    =  0100;
-  public static final int O_EXCL     =  0200;
-  public static final int O_NOCTTY   =  0400;
-  public static final int O_TRUNC    = 01000;
-  public static final int O_APPEND   = 02000;
-  public static final int O_NONBLOCK = 04000;
-  public static final int O_SYNC   =  010000;
-  public static final int O_ASYNC  =  020000;
-  public static final int O_FSYNC = O_SYNC;
-  public static final int O_NDELAY = O_NONBLOCK;
-
-  // Flags for posix_fadvise() from bits/fcntl.h
-  /* No further special treatment.  */
-  public static final int POSIX_FADV_NORMAL = 0; 
-  /* Expect random page references.  */
-  public static final int POSIX_FADV_RANDOM = 1; 
-  /* Expect sequential page references.  */
-  public static final int POSIX_FADV_SEQUENTIAL = 2; 
-  /* Will need these pages.  */
-  public static final int POSIX_FADV_WILLNEED = 3; 
-  /* Don't need these pages.  */
-  public static final int POSIX_FADV_DONTNEED = 4; 
-  /* Data will be accessed once.  */
-  public static final int POSIX_FADV_NOREUSE = 5; 
-
-
-  /* Wait upon writeout of all pages
-     in the range before performing the
-     write.  */
-  public static final int SYNC_FILE_RANGE_WAIT_BEFORE = 1;
-  /* Initiate writeout of all those
-     dirty pages in the range which are
-     not presently under writeback.  */
-  public static final int SYNC_FILE_RANGE_WRITE = 2;
-
-  /* Wait upon writeout of all pages in
-     the range after performing the
-     write.  */
-  public static final int SYNC_FILE_RANGE_WAIT_AFTER = 4;
+  public static class POSIX {
+    // Flags for open() call from bits/fcntl.h
+    public static final int O_RDONLY   =    00;
+    public static final int O_WRONLY   =    01;
+    public static final int O_RDWR     =    02;
+    public static final int O_CREAT    =  0100;
+    public static final int O_EXCL     =  0200;
+    public static final int O_NOCTTY   =  0400;
+    public static final int O_TRUNC    = 01000;
+    public static final int O_APPEND   = 02000;
+    public static final int O_NONBLOCK = 04000;
+    public static final int O_SYNC   =  010000;
+    public static final int O_ASYNC  =  020000;
+    public static final int O_FSYNC = O_SYNC;
+    public static final int O_NDELAY = O_NONBLOCK;
+
+    // Flags for posix_fadvise() from bits/fcntl.h
+    /* No further special treatment.  */
+    public static final int POSIX_FADV_NORMAL = 0;
+    /* Expect random page references.  */
+    public static final int POSIX_FADV_RANDOM = 1;
+    /* Expect sequential page references.  */
+    public static final int POSIX_FADV_SEQUENTIAL = 2;
+    /* Will need these pages.  */
+    public static final int POSIX_FADV_WILLNEED = 3;
+    /* Don't need these pages.  */
+    public static final int POSIX_FADV_DONTNEED = 4;
+    /* Data will be accessed once.  */
+    public static final int POSIX_FADV_NOREUSE = 5;
+
+
+    /* Wait upon writeout of all pages
+       in the range before performing the
+       write.  */
+    public static final int SYNC_FILE_RANGE_WAIT_BEFORE = 1;
+    /* Initiate writeout of all those
+       dirty pages in the range which are
+       not presently under writeback.  */
+    public static final int SYNC_FILE_RANGE_WRITE = 2;
+
+    /* Wait upon writeout of all pages in
+       the range after performing the
+       write.  */
+    public static final int SYNC_FILE_RANGE_WAIT_AFTER = 4;
+
+    private static final Log LOG = LogFactory.getLog(NativeIO.class);
+
+    private static boolean nativeLoaded = false;
+    private static boolean fadvisePossible = true;
+    private static boolean syncFileRangePossible = true;
+
+    static final String WORKAROUND_NON_THREADSAFE_CALLS_KEY =
+      "hadoop.workaround.non.threadsafe.getpwuid";
+    static final boolean WORKAROUND_NON_THREADSAFE_CALLS_DEFAULT = false;
+
+    private static long cacheTimeout = -1;
+
+    static {
+      if (NativeCodeLoader.isNativeCodeLoaded()) {
+        try {
+          Configuration conf = new Configuration();
+          workaroundNonThreadSafePasswdCalls = conf.getBoolean(
+            WORKAROUND_NON_THREADSAFE_CALLS_KEY,
+            WORKAROUND_NON_THREADSAFE_CALLS_DEFAULT);
+
+          initNative();
+          nativeLoaded = true;
+
+          cacheTimeout = conf.getLong(
+            CommonConfigurationKeys.HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_KEY,
+            CommonConfigurationKeys.HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_DEFAULT) *
+            1000;
+          LOG.debug("Initialized cache for IDs to User/Group mapping with a " +
+            " cache timeout of " + cacheTimeout/1000 + " seconds.");
+
+        } catch (Throwable t) {
+          // This can happen if the user has an older version of libhadoop.so
+          // installed - in this case we can continue without native IO
+          // after warning
+          LOG.error("Unable to initialize NativeIO libraries", t);
+        }
+      }
+    }
 
-  private static final Log LOG = LogFactory.getLog(NativeIO.class);
+    /**
+     * Return true if the JNI-based native IO extensions are available.
+     */
+    public static boolean isAvailable() {
+      return NativeCodeLoader.isNativeCodeLoaded() && nativeLoaded;
+    }
+
+    /** Wrapper around open(2) */
+    public static native FileDescriptor open(String path, int flags, int mode) throws IOException;
+    /** Wrapper around fstat(2) */
+    private static native Stat fstat(FileDescriptor fd) throws IOException;
+
+    /** Native chmod implementation. On UNIX, it is a wrapper around chmod(2) */
+    private static native void chmodImpl(String path, int mode) throws IOException;
+
+    public static void chmod(String path, int mode) throws IOException {
+      if (!Shell.WINDOWS) {
+        chmodImpl(path, mode);
+      } else {
+        try {
+          chmodImpl(path, mode);
+        } catch (NativeIOException nioe) {
+          if (nioe.getErrorCode() == 3) {
+            throw new NativeIOException("No such file or directory",
+                Errno.ENOENT);
+          } else {
+            LOG.warn(String.format("NativeIO.chmod error (%d): %s",
+                nioe.getErrorCode(), nioe.getMessage()));
+            throw new NativeIOException("Unknown error", Errno.UNKNOWN);
+          }
+        }
+      }
+    }
+
+    /** Wrapper around posix_fadvise(2) */
+    static native void posix_fadvise(
+      FileDescriptor fd, long offset, long len, int flags) throws NativeIOException;
+
+    /** Wrapper around sync_file_range(2) */
+    static native void sync_file_range(
+      FileDescriptor fd, long offset, long nbytes, int flags) throws NativeIOException;
+
+    /**
+     * Call posix_fadvise on the given file descriptor. See the manpage
+     * for this syscall for more information. On systems where this
+     * call is not available, does nothing.
+     *
+     * @throws NativeIOException if there is an error with the syscall
+     */
+    public static void posixFadviseIfPossible(
+        FileDescriptor fd, long offset, long len, int flags)
+        throws NativeIOException {
+      if (nativeLoaded && fadvisePossible) {
+        try {
+          posix_fadvise(fd, offset, len, flags);
+        } catch (UnsupportedOperationException uoe) {
+          fadvisePossible = false;
+        } catch (UnsatisfiedLinkError ule) {
+          fadvisePossible = false;
+        }
+      }
+    }
+
+    /**
+     * Call sync_file_range on the given file descriptor. See the manpage
+     * for this syscall for more information. On systems where this
+     * call is not available, does nothing.
+     *
+     * @throws NativeIOException if there is an error with the syscall
+     */
+    public static void syncFileRangeIfPossible(
+        FileDescriptor fd, long offset, long nbytes, int flags)
+        throws NativeIOException {
+      if (nativeLoaded && syncFileRangePossible) {
+        try {
+          sync_file_range(fd, offset, nbytes, flags);
+        } catch (UnsupportedOperationException uoe) {
+          syncFileRangePossible = false;
+        } catch (UnsatisfiedLinkError ule) {
+          syncFileRangePossible = false;
+        }
+      }
+    }
+
+    /** Linux only methods used for getOwner() implementation */
+    private static native long getUIDforFDOwnerforOwner(FileDescriptor fd) throws IOException;
+    private static native String getUserName(long uid) throws IOException;
+
+    /**
+     * Result type of the fstat call
+     */
+    public static class Stat {
+      private int ownerId, groupId;
+      private String owner, group;
+      private int mode;
+
+      // Mode constants
+      public static final int S_IFMT = 0170000;      /* type of file */
+      public static final int   S_IFIFO  = 0010000;  /* named pipe (fifo) */
+      public static final int   S_IFCHR  = 0020000;  /* character special */
+      public static final int   S_IFDIR  = 0040000;  /* directory */
+      public static final int   S_IFBLK  = 0060000;  /* block special */
+      public static final int   S_IFREG  = 0100000;  /* regular */
+      public static final int   S_IFLNK  = 0120000;  /* symbolic link */
+      public static final int   S_IFSOCK = 0140000;  /* socket */
+      public static final int   S_IFWHT  = 0160000;  /* whiteout */
+      public static final int S_ISUID = 0004000;  /* set user id on execution */
+      public static final int S_ISGID = 0002000;  /* set group id on execution */
+      public static final int S_ISVTX = 0001000;  /* save swapped text even after use */
+      public static final int S_IRUSR = 0000400;  /* read permission, owner */
+      public static final int S_IWUSR = 0000200;  /* write permission, owner */
+      public static final int S_IXUSR = 0000100;  /* execute/search permission, owner */
+
+      Stat(int ownerId, int groupId, int mode) {
+        this.ownerId = ownerId;
+        this.groupId = groupId;
+        this.mode = mode;
+      }
+
+      @Override
+      public String toString() {
+        return "Stat(owner='" + owner + "', group='" + group + "'" +
+          ", mode=" + mode + ")";
+      }
+
+      public String getOwner() {
+        return owner;
+      }
+      public String getGroup() {
+        return group;
+      }
+      public int getMode() {
+        return mode;
+      }
+    }
+
+    /**
+     * Returns the file stat for a file descriptor.
+     *
+     * @param fd file descriptor.
+     * @return the file descriptor file stat.
+     * @throws IOException thrown if there was an IO error while obtaining the file stat.
+     */
+    public static Stat getFstat(FileDescriptor fd) throws IOException {
+      Stat stat = fstat(fd);
+      stat.owner = getName(IdCache.USER, stat.ownerId);
+      stat.group = getName(IdCache.GROUP, stat.groupId);
+      return stat;
+    }
+
+    private static String getName(IdCache domain, int id) throws IOException {
+      Map<Integer, CachedName> idNameCache = (domain == IdCache.USER)
+        ? USER_ID_NAME_CACHE : GROUP_ID_NAME_CACHE;
+      String name;
+      CachedName cachedName = idNameCache.get(id);
+      long now = System.currentTimeMillis();
+      if (cachedName != null && (cachedName.timestamp + cacheTimeout) > now) {
+        name = cachedName.name;
+      } else {
+        name = (domain == IdCache.USER) ? getUserName(id) : getGroupName(id);
+        if (LOG.isDebugEnabled()) {
+          String type = (domain == IdCache.USER) ? "UserName" : "GroupName";
+          LOG.debug("Got " + type + " " + name + " for ID " + id +
+            " from the native implementation");
+        }
+        cachedName = new CachedName(name, now);
+        idNameCache.put(id, cachedName);
+      }
+      return name;
+    }
+
+    static native String getUserName(int uid) throws IOException;
+    static native String getGroupName(int uid) throws IOException;
+
+    private static class CachedName {
+      final long timestamp;
+      final String name;
+
+      public CachedName(String name, long timestamp) {
+        this.name = name;
+        this.timestamp = timestamp;
+      }
+    }
+
+    private static final Map<Integer, CachedName> USER_ID_NAME_CACHE =
+      new ConcurrentHashMap<Integer, CachedName>();
+
+    private static final Map<Integer, CachedName> GROUP_ID_NAME_CACHE =
+      new ConcurrentHashMap<Integer, CachedName>();
+
+    private enum IdCache { USER, GROUP }
+  }
 
-  private static boolean nativeLoaded = false;
   private static boolean workaroundNonThreadSafePasswdCalls = false;
-  private static boolean fadvisePossible = true;
-  private static boolean syncFileRangePossible = true;
 
-  static final String WORKAROUND_NON_THREADSAFE_CALLS_KEY =
-    "hadoop.workaround.non.threadsafe.getpwuid";
-  static final boolean WORKAROUND_NON_THREADSAFE_CALLS_DEFAULT = false;
 
-  private static long cacheTimeout = -1;
+  public static class Windows {
+    // Flags for CreateFile() call on Windows
+    public static final long GENERIC_READ = 0x80000000L;
+    public static final long GENERIC_WRITE = 0x40000000L;
+
+    public static final long FILE_SHARE_READ = 0x00000001L;
+    public static final long FILE_SHARE_WRITE = 0x00000002L;
+    public static final long FILE_SHARE_DELETE = 0x00000004L;
+
+    public static final long CREATE_NEW = 1;
+    public static final long CREATE_ALWAYS = 2;
+    public static final long OPEN_EXISTING = 3;
+    public static final long OPEN_ALWAYS = 4;
+    public static final long TRUNCATE_EXISTING = 5;
+
+    public static final long FILE_BEGIN = 0;
+    public static final long FILE_CURRENT = 1;
+    public static final long FILE_END = 2;
+
+    /** Wrapper around CreateFile() on Windows */
+    public static native FileDescriptor createFile(String path,
+        long desiredAccess, long shareMode, long creationDisposition)
+        throws IOException;
+
+    /** Wrapper around SetFilePointer() on Windows */
+    public static native long setFilePointer(FileDescriptor fd,
+        long distanceToMove, long moveMethod) throws IOException;
+
+    /** Windows only methods used for getOwner() implementation */
+    private static native String getOwner(FileDescriptor fd) throws IOException;
+
+    static {
+      if (NativeCodeLoader.isNativeCodeLoaded()) {
+        try {
+          initNative();
+          nativeLoaded = true;
+        } catch (Throwable t) {
+          // This can happen if the user has an older version of libhadoop.so
+          // installed - in this case we can continue without native IO
+          // after warning
+          LOG.error("Unable to initialize NativeIO libraries", t);
+        }
+      }
+    }
+  }
+
+  private static final Log LOG = LogFactory.getLog(NativeIO.class);
+
+  private static boolean nativeLoaded = false;
 
   static {
     if (NativeCodeLoader.isNativeCodeLoaded()) {
       try {
-        Configuration conf = new Configuration();
-        workaroundNonThreadSafePasswdCalls = conf.getBoolean(
-          WORKAROUND_NON_THREADSAFE_CALLS_KEY,
-          WORKAROUND_NON_THREADSAFE_CALLS_DEFAULT);
-
         initNative();
         nativeLoaded = true;
-
-        cacheTimeout = conf.getLong(
-          CommonConfigurationKeys.HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_KEY,
-          CommonConfigurationKeys.HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_DEFAULT) *
-          1000;
-        LOG.debug("Initialized cache for IDs to User/Group mapping with a" +
-          " cache timeout of " + cacheTimeout/1000 + " seconds.");
-
       } catch (Throwable t) {
         // This can happen if the user has an older version of libhadoop.so
         // installed - in this case we can continue without native IO
@@ -130,169 +396,161 @@ public class NativeIO {
     return NativeCodeLoader.isNativeCodeLoaded() && nativeLoaded;
   }
 
-  /** Wrapper around open(2) */
-  public static native FileDescriptor open(String path, int flags, int mode) throws IOException;
-  /** Wrapper around fstat(2) */
-  private static native Stat fstat(FileDescriptor fd) throws IOException;
-  /** Wrapper around chmod(2) */
-  public static native void chmod(String path, int mode) throws IOException;
-
-  /** Wrapper around posix_fadvise(2) */
-  static native void posix_fadvise(
-    FileDescriptor fd, long offset, long len, int flags) throws NativeIOException;
-
-  /** Wrapper around sync_file_range(2) */
-  static native void sync_file_range(
-    FileDescriptor fd, long offset, long nbytes, int flags) throws NativeIOException;
-
   /** Initialize the JNI method ID and class ID cache */
   private static native void initNative();
 
-  /**
-   * Call posix_fadvise on the given file descriptor. See the manpage
-   * for this syscall for more information. On systems where this
-   * call is not available, does nothing.
-   *
-   * @throws NativeIOException if there is an error with the syscall
-   */
-  public static void posixFadviseIfPossible(
-      FileDescriptor fd, long offset, long len, int flags)
-      throws NativeIOException {
-    if (nativeLoaded && fadvisePossible) {
-      try {
-        posix_fadvise(fd, offset, len, flags);
-      } catch (UnsupportedOperationException uoe) {
-        fadvisePossible = false;
-      } catch (UnsatisfiedLinkError ule) {
-        fadvisePossible = false;
-      }
+  private static class CachedUid {
+    final long timestamp;
+    final String username;
+    public CachedUid(String username, long timestamp) {
+      this.timestamp = timestamp;
+      this.username = username;
     }
   }
-
-  /**
-   * Call sync_file_range on the given file descriptor. See the manpage
-   * for this syscall for more information. On systems where this
-   * call is not available, does nothing.
-   *
-   * @throws NativeIOException if there is an error with the syscall
-   */
-  public static void syncFileRangeIfPossible(
-      FileDescriptor fd, long offset, long nbytes, int flags)
-      throws NativeIOException {
-    if (nativeLoaded && syncFileRangePossible) {
-      try {
-        sync_file_range(fd, offset, nbytes, flags);
-      } catch (UnsupportedOperationException uoe) {
-        syncFileRangePossible = false;
-      } catch (UnsatisfiedLinkError ule) {
-        syncFileRangePossible = false;
+  private static final Map<Long, CachedUid> uidCache =
+      new ConcurrentHashMap<Long, CachedUid>();
+  private static long cacheTimeout;
+  private static boolean initialized = false;
+
+  public static String getOwner(FileDescriptor fd) throws IOException {
+    ensureInitialized();
+    if (Shell.WINDOWS) {
+      String owner = Windows.getOwner(fd);
+      int i = owner.indexOf('\\');
+      if (i != -1)
+        owner = owner.substring(i + 1);
+      return owner;
+    } else {
+      long uid = POSIX.getUIDforFDOwnerforOwner(fd);
+      CachedUid cUid = uidCache.get(uid);
+      long now = System.currentTimeMillis();
+      if (cUid != null && (cUid.timestamp + cacheTimeout) > now) {
+        return cUid.username;
       }
+      String user = POSIX.getUserName(uid);
+      LOG.info("Got UserName " + user + " for UID " + uid
+          + " from the native implementation");
+      cUid = new CachedUid(user, now);
+      uidCache.put(uid, cUid);
+      return user;
     }
   }
 
   /**
-   * Result type of the fstat call
+   * Create a FileInputStream that shares delete permission on the
+   * file opened, i.e. other process can delete the file the
+   * FileInputStream is reading. Only Windows implementation uses
+   * the native interface.
    */
-  public static class Stat {
-    private int ownerId, groupId;
-    private String owner, group;
-    private int mode;
-
-    // Mode constants
-    public static final int S_IFMT = 0170000;      /* type of file */
-    public static final int   S_IFIFO  = 0010000;  /* named pipe (fifo) */
-    public static final int   S_IFCHR  = 0020000;  /* character special */
-    public static final int   S_IFDIR  = 0040000;  /* directory */
-    public static final int   S_IFBLK  = 0060000;  /* block special */
-    public static final int   S_IFREG  = 0100000;  /* regular */
-    public static final int   S_IFLNK  = 0120000;  /* symbolic link */
-    public static final int   S_IFSOCK = 0140000;  /* socket */
-    public static final int   S_IFWHT  = 0160000;  /* whiteout */
-    public static final int S_ISUID = 0004000;  /* set user id on execution */
-    public static final int S_ISGID = 0002000;  /* set group id on execution */
-    public static final int S_ISVTX = 0001000;  /* save swapped text even after use */
-    public static final int S_IRUSR = 0000400;  /* read permission, owner */
-    public static final int S_IWUSR = 0000200;  /* write permission, owner */
-    public static final int S_IXUSR = 0000100;  /* execute/search permission, owner */
-
-    Stat(int ownerId, int groupId, int mode) {
-      this.ownerId = ownerId;
-      this.groupId = groupId;
-      this.mode = mode;
-    }
-
-    @Override
-    public String toString() {
-      return "Stat(owner='" + owner + "', group='" + group + "'" +
-        ", mode=" + mode + ")";
-    }
-
-    public String getOwner() {
-      return owner;
-    }
-    public String getGroup() {
-      return group;
-    }
-    public int getMode() {
-      return mode;
+  public static FileInputStream getShareDeleteFileInputStream(File f)
+      throws IOException {
+    if (!Shell.WINDOWS) {
+      // On Linux the default FileInputStream shares delete permission
+      // on the file opened.
+      //
+      return new FileInputStream(f);
+    } else {
+      // Use Windows native interface to create a FileInputStream that
+      // shares delete permission on the file opened.
+      //
+      FileDescriptor fd = Windows.createFile(
+          f.getAbsolutePath(),
+          Windows.GENERIC_READ,
+          Windows.FILE_SHARE_READ |
+              Windows.FILE_SHARE_WRITE |
+              Windows.FILE_SHARE_DELETE,
+          Windows.OPEN_EXISTING);
+      return new FileInputStream(fd);
     }
   }
 
-  static native String getUserName(int uid) throws IOException;
-
-  static native String getGroupName(int uid) throws IOException;
-
-  private static class CachedName {
-    final long timestamp;
-    final String name;
-
-    public CachedName(String name, long timestamp) {
-      this.name = name;
-      this.timestamp = timestamp;
+  /**
+   * Create a FileInputStream that shares delete permission on the
+   * file opened at a given offset, i.e. other process can delete
+   * the file the FileInputStream is reading. Only Windows implementation
+   * uses the native interface.
+   */
+  public static FileInputStream getShareDeleteFileInputStream(File f, long seekOffset)
+      throws IOException {
+    if (!Shell.WINDOWS) {
+      RandomAccessFile rf = new RandomAccessFile(f, "r");
+      if (seekOffset > 0) {
+        rf.seek(seekOffset);
+      }
+      return new FileInputStream(rf.getFD());
+    } else {
+      // Use Windows native interface to create a FileInputStream that
+      // shares delete permission on the file opened, and set it to the
+      // given offset.
+      //
+      FileDescriptor fd = NativeIO.Windows.createFile(
+          f.getAbsolutePath(),
+          NativeIO.Windows.GENERIC_READ,
+          NativeIO.Windows.FILE_SHARE_READ |
+              NativeIO.Windows.FILE_SHARE_WRITE |
+              NativeIO.Windows.FILE_SHARE_DELETE,
+          NativeIO.Windows.OPEN_EXISTING);
+      if (seekOffset > 0)
+        NativeIO.Windows.setFilePointer(fd, seekOffset, NativeIO.Windows.FILE_BEGIN);
+      return new FileInputStream(fd);
     }
   }
 
-  private static final Map<Integer, CachedName> USER_ID_NAME_CACHE =
-    new ConcurrentHashMap<Integer, CachedName>();
-
-  private static final Map<Integer, CachedName> GROUP_ID_NAME_CACHE =
-    new ConcurrentHashMap<Integer, CachedName>();
-
-  private enum IdCache { USER, GROUP }
-
-  private static String getName(IdCache domain, int id) throws IOException {
-    Map<Integer, CachedName> idNameCache = (domain == IdCache.USER)
-      ? USER_ID_NAME_CACHE : GROUP_ID_NAME_CACHE;
-    String name;
-    CachedName cachedName = idNameCache.get(id);
-    long now = System.currentTimeMillis();
-    if (cachedName != null && (cachedName.timestamp + cacheTimeout) > now) {
-      name = cachedName.name;
+  /**
+   * Create the specified File for write access, ensuring that it does not exist.
+   * @param f the file that we want to create
+   * @param permissions we want to have on the file (if security is enabled)
+   *
+   * @throws AlreadyExistsException if the file already exists
+   * @throws IOException if any other error occurred
+   */
+  public static FileOutputStream getCreateForWriteFileOutputStream(File f, int permissions)
+      throws IOException {
+    if (!Shell.WINDOWS) {
+      // Use the native wrapper around open(2)
+      try {
+        FileDescriptor fd = NativeIO.POSIX.open(f.getAbsolutePath(),
+            NativeIO.POSIX.O_WRONLY | NativeIO.POSIX.O_CREAT
+                | NativeIO.POSIX.O_EXCL, permissions);
+        return new FileOutputStream(fd);
+      } catch (NativeIOException nioe) {
+        if (nioe.getErrno() == Errno.EEXIST) {
+          throw new AlreadyExistsException(nioe);
+        }
+        throw nioe;
+      }
     } else {
-      name = (domain == IdCache.USER) ? getUserName(id) : getGroupName(id);
-      if (LOG.isDebugEnabled()) {
-        String type = (domain == IdCache.USER) ? "UserName" : "GroupName";
-        LOG.debug("Got " + type + " " + name + " for ID " + id +
-          " from the native implementation");
+      // Use the Windows native APIs to create equivalent FileOutputStream
+      try {
+        FileDescriptor fd = NativeIO.Windows.createFile(f.getCanonicalPath(),
+            NativeIO.Windows.GENERIC_WRITE,
+            NativeIO.Windows.FILE_SHARE_DELETE
+                | NativeIO.Windows.FILE_SHARE_READ
+                | NativeIO.Windows.FILE_SHARE_WRITE,
+            NativeIO.Windows.CREATE_NEW);
+        NativeIO.POSIX.chmod(f.getCanonicalPath(), permissions);
+        return new FileOutputStream(fd);
+      } catch (NativeIOException nioe) {
+        if (nioe.getErrorCode() == 80) {
+          // ERROR_FILE_EXISTS
+          // 80 (0x50)
+          // The file exists
+          throw new AlreadyExistsException(nioe);
+        }
+        throw nioe;
       }
-      cachedName = new CachedName(name, now);
-      idNameCache.put(id, cachedName);
     }
-    return name;
   }
 
-  /**
-   * Returns the file stat for a file descriptor.
-   *
-   * @param fd file descriptor.
-   * @return the file descriptor file stat.
-   * @throws IOException thrown if there was an IO error while obtaining the file stat.
-   */
-  public static Stat getFstat(FileDescriptor fd) throws IOException {
-    Stat stat = fstat(fd);
-    stat.owner = getName(IdCache.USER, stat.ownerId);
-    stat.group = getName(IdCache.GROUP, stat.groupId);
-    return stat;
+  private synchronized static void ensureInitialized() {
+    if (!initialized) {
+      cacheTimeout =
+          new Configuration().getLong("hadoop.security.uid.cache.secs",
+              4*60*60) * 1000;
+      LOG.info("Initialized cache for UID to User mapping with a cache" +
+          " timeout of " + cacheTimeout/1000 + " seconds.");
+      initialized = true;
+    }
   }
   
   /**

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIOException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIOException.java?rev=1485845&r1=1485844&r2=1485845&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIOException.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIOException.java Thu May 23 20:41:53 2013
@@ -18,20 +18,40 @@
 package org.apache.hadoop.io.nativeio;
 
 import java.io.IOException;
+import org.apache.hadoop.util.Shell;
+
 
 /**
  * An exception generated by a call to the native IO code.
  *
- * These exceptions simply wrap <i>errno</i> result codes.
+ * These exceptions simply wrap <i>errno</i> result codes on Linux,
+ * or the System Error Code on Windows.
  */
 public class NativeIOException extends IOException {
   private static final long serialVersionUID = 1L;
 
   private Errno errno;
 
+  // Java has no unsigned primitive error code. Use a signed 32-bit
+  // integer to hold the unsigned 32-bit integer.
+  private int errorCode;
+
   public NativeIOException(String msg, Errno errno) {
     super(msg);
     this.errno = errno;
+    // Windows error code is always set to ERROR_SUCCESS on Linux,
+    // i.e. no failure on Windows
+    this.errorCode = 0;
+  }
+
+  public NativeIOException(String msg, int errorCode) {
+    super(msg);
+    this.errorCode = errorCode;
+    this.errno = Errno.UNKNOWN;
+  }
+
+  public long getErrorCode() {
+    return errorCode;
   }
 
   public Errno getErrno() {
@@ -40,8 +60,10 @@ public class NativeIOException extends I
 
   @Override
   public String toString() {
-    return errno.toString() + ": " + super.getMessage();
+    if (Shell.WINDOWS)
+      return errorCode + ": " + super.getMessage();
+    else
+      return errno.toString() + ": " + super.getMessage();
   }
 }
 
-

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/MetricsServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/MetricsServlet.java?rev=1485845&r1=1485844&r2=1485845&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/MetricsServlet.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/MetricsServlet.java Thu May 23 20:41:53 2013
@@ -138,10 +138,12 @@ public class MetricsServlet extends Http
    */
   void printMap(PrintWriter out, Map<String, Map<String, List<TagsMetricsPair>>> map) {
     for (Map.Entry<String, Map<String, List<TagsMetricsPair>>> context : map.entrySet()) {
-      out.println(context.getKey());
+      out.print(context.getKey());
+      out.print("\n");
       for (Map.Entry<String, List<TagsMetricsPair>> record : context.getValue().entrySet()) {
         indent(out, 1);
-        out.println(record.getKey());
+        out.print(record.getKey());
+        out.print("\n");
         for (TagsMetricsPair pair : record.getValue()) {
           indent(out, 2);
           // Prints tag values in the form "{key=value,key=value}:"
@@ -157,7 +159,7 @@ public class MetricsServlet extends Http
             out.print("=");
             out.print(tagValue.getValue().toString());
           }
-          out.println("}:");
+          out.print("}:\n");
           
           // Now print metric values, one per line
           for (Map.Entry<String, Number> metricValue : 
@@ -165,7 +167,8 @@ public class MetricsServlet extends Http
             indent(out, 3);
             out.print(metricValue.getKey());
             out.print("=");
-            out.println(metricValue.getValue().toString());
+            out.print(metricValue.getValue().toString());
+            out.print("\n");
           }
         }
       }

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ShellBasedUnixGroupsMapping.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ShellBasedUnixGroupsMapping.java?rev=1485845&r1=1485844&r2=1485845&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ShellBasedUnixGroupsMapping.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ShellBasedUnixGroupsMapping.java Thu May 23 20:41:53 2013
@@ -86,7 +86,8 @@ public class ShellBasedUnixGroupsMapping
       LOG.warn("got exception trying to get groups for user " + user, e);
     }
     
-    StringTokenizer tokenizer = new StringTokenizer(result);
+    StringTokenizer tokenizer =
+        new StringTokenizer(result, Shell.TOKEN_SEPARATOR_REGEX);
     List<String> groups = new LinkedList<String>();
     while (tokenizer.hasMoreTokens()) {
       groups.add(tokenizer.nextToken());

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCrc32.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCrc32.java?rev=1485845&r1=1485844&r2=1485845&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCrc32.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCrc32.java Thu May 23 20:41:53 2013
@@ -60,7 +60,7 @@ class NativeCrc32 {
         fileName, basePos);
   }
   
-  private static native void nativeVerifyChunkedSums(
+    private static native void nativeVerifyChunkedSums(
       int bytesPerSum, int checksumType,
       ByteBuffer sums, int sumsOffset,
       ByteBuffer data, int dataOffset, int dataLength,

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java?rev=1485845&r1=1485844&r2=1485845&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java Thu May 23 20:41:53 2013
@@ -21,6 +21,7 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -44,46 +45,208 @@ abstract public class Shell {
   
   public static final Log LOG = LogFactory.getLog(Shell.class);
   
+  private static boolean IS_JAVA7_OR_ABOVE =
+      System.getProperty("java.version").substring(0, 3).compareTo("1.7") >= 0;
+
+  public static boolean isJava7OrAbove() {
+    return IS_JAVA7_OR_ABOVE;
+  }
+
   /** a Unix command to get the current user's name */
   public final static String USER_NAME_COMMAND = "whoami";
+
+  /** Windows CreateProcess synchronization object */
+  public static final Object WindowsProcessLaunchLock = new Object();
+
   /** a Unix command to get the current user's groups list */
   public static String[] getGroupsCommand() {
-    return new String[]{"bash", "-c", "groups"};
+    return (WINDOWS)? new String[]{"cmd", "/c", "groups"}
+                    : new String[]{"bash", "-c", "groups"};
   }
+
   /** a Unix command to get a given user's groups list */
   public static String[] getGroupsForUserCommand(final String user) {
     //'groups username' command return is non-consistent across different unixes
-    return new String [] {"bash", "-c", "id -Gn " + user};
+    return (WINDOWS)? new String[] { WINUTILS, "groups", "-F", "\"" + user + "\""}
+                    : new String [] {"bash", "-c", "id -Gn " + user};
   }
+
   /** a Unix command to get a given netgroup's user list */
   public static String[] getUsersForNetgroupCommand(final String netgroup) {
     //'groups username' command return is non-consistent across different unixes
-    return new String [] {"bash", "-c", "getent netgroup " + netgroup};
+    return (WINDOWS)? new String [] {"cmd", "/c", "getent netgroup " + netgroup}
+                    : new String [] {"bash", "-c", "getent netgroup " + netgroup};
+  }
+
+  /** Return a command to get permission information. */
+  public static String[] getGetPermissionCommand() {
+    return (WINDOWS) ? new String[] { WINUTILS, "ls", "-F" }
+                     : new String[] { "/bin/ls", "-ld" };
+  }
+
+  /** Return a command to set permission */
+  public static String[] getSetPermissionCommand(String perm, boolean recursive) {
+    if (recursive) {
+      return (WINDOWS) ? new String[] { WINUTILS, "chmod", "-R", perm }
+                         : new String[] { "chmod", "-R", perm };
+    } else {
+      return (WINDOWS) ? new String[] { WINUTILS, "chmod", perm }
+                       : new String[] { "chmod", perm };
+    }
+  }
+
+  /**
+   * Return a command to set permission for specific file.
+   * 
+   * @param perm String permission to set
+   * @param recursive boolean true to apply to all sub-directories recursively
+   * @param file String file to set
+   * @return String[] containing command and arguments
+   */
+  public static String[] getSetPermissionCommand(String perm, boolean recursive,
+                                                 String file) {
+    String[] baseCmd = getSetPermissionCommand(perm, recursive);
+    String[] cmdWithFile = Arrays.copyOf(baseCmd, baseCmd.length + 1);
+    cmdWithFile[cmdWithFile.length - 1] = file;
+    return cmdWithFile;
+  }
+
+  /** Return a command to set owner */
+  public static String[] getSetOwnerCommand(String owner) {
+    return (WINDOWS) ? new String[] { WINUTILS, "chown", "\"" + owner + "\"" }
+                     : new String[] { "chown", owner };
+  }
+  
+  /** Return a command to create symbolic links */
+  public static String[] getSymlinkCommand(String target, String link) {
+    return WINDOWS ? new String[] { WINUTILS, "symlink", link, target }
+                   : new String[] { "ln", "-s", target, link };
   }
+
   /** a Unix command to set permission */
   public static final String SET_PERMISSION_COMMAND = "chmod";
   /** a Unix command to set owner */
   public static final String SET_OWNER_COMMAND = "chown";
+
+  /** a Unix command to set the change user's groups list */
   public static final String SET_GROUP_COMMAND = "chgrp";
   /** a Unix command to create a link */
   public static final String LINK_COMMAND = "ln";
   /** a Unix command to get a link target */
   public static final String READ_LINK_COMMAND = "readlink";
-  /** Return a Unix command to get permission information. */
-  public static String[] getGET_PERMISSION_COMMAND() {
-    //force /bin/ls, except on windows.
-    return new String[] {(WINDOWS ? "ls" : "/bin/ls"), "-ld"};
-  }
 
   /**Time after which the executing script would be timedout*/
   protected long timeOutInterval = 0L;
   /** If or not script timed out*/
   private AtomicBoolean timedOut;
 
+
+  /** Centralized logic to discover and validate the sanity of the Hadoop 
+   *  home directory. Returns either NULL or a directory that exists and 
+   *  was specified via either -Dhadoop.home.dir or the HADOOP_HOME ENV 
+   *  variable.  This does a lot of work so it should only be called 
+   *  privately for initialization once per process.
+   **/
+  private static String checkHadoopHome() {
+
+    // first check the Dflag hadoop.home.dir with JVM scope
+    String home = System.getProperty("hadoop.home.dir");
+
+    // fall back to the system/user-global env variable
+    if (home == null) {
+      home = System.getenv("HADOOP_HOME");
+    }
+
+    try {
+       // couldn't find either setting for hadoop's home directory
+       if (home == null) {
+         throw new IOException("HADOOP_HOME or hadoop.home.dir are not set.");
+       }
+
+       if (home.startsWith("\"") && home.endsWith("\"")) {
+         home = home.substring(1, home.length()-1);
+       }
+
+       // check that the home setting is actually a directory that exists
+       File homedir = new File(home);
+       if (!homedir.isAbsolute() || !homedir.exists() || !homedir.isDirectory()) {
+         throw new IOException("Hadoop home directory " + homedir
+           + " does not exist, is not a directory, or is not an absolute path.");
+       }
+
+       home = homedir.getCanonicalPath();
+
+    } catch (IOException ioe) {
+       LOG.error("Failed to detect a valid hadoop home directory", ioe);
+       home = null;
+    }
+    
+    return home;
+  }
+  private static String HADOOP_HOME_DIR = checkHadoopHome();
+
+  // Public getter, throws an exception if HADOOP_HOME failed validation
+  // checks and is being referenced downstream.
+  public static final String getHadoopHome() throws IOException {
+    if (HADOOP_HOME_DIR == null) {
+      throw new IOException("Misconfigured HADOOP_HOME cannot be referenced.");
+    }
+
+    return HADOOP_HOME_DIR;
+  }
+
+  /** fully qualify the path to a binary that should be in a known hadoop 
+   *  bin location. This is primarily useful for disambiguating call-outs 
+   *  to executable sub-components of Hadoop to avoid clashes with other 
+   *  executables that may be in the path.  Caveat:  this call doesn't 
+   *  just format the path to the bin directory.  It also checks for file 
+   *  existence of the composed path. The output of this call should be 
+   *  cached by callers.
+   * */
+  public static final String getQualifiedBinPath(String executable) 
+  throws IOException {
+    // construct hadoop bin path to the specified executable
+    String fullExeName = HADOOP_HOME_DIR + File.separator + "bin" 
+      + File.separator + executable;
+
+    File exeFile = new File(fullExeName);
+    if (!exeFile.exists()) {
+      throw new IOException("Could not locate executable " + fullExeName
+        + " in the Hadoop binaries.");
+    }
+
+    return exeFile.getCanonicalPath();
+  }
+
   /** Set to true on Windows platforms */
   public static final boolean WINDOWS /* borrowed from Path.WINDOWS */
                 = System.getProperty("os.name").startsWith("Windows");
+
+  public static final boolean LINUX
+                = System.getProperty("os.name").startsWith("Linux");
   
+  /** a Windows utility to emulate Unix commands */
+  public static final String WINUTILS = getWinUtilsPath();
+
+  public static final String getWinUtilsPath() {
+    String winUtilsPath = null;
+
+    try {
+      if (WINDOWS) {
+        winUtilsPath = getQualifiedBinPath("winutils.exe");
+      }
+    } catch (IOException ioe) {
+       LOG.error("Failed to locate the winutils binary in the hadoop binary path",
+         ioe);
+    }
+
+    return winUtilsPath;
+  }
+
+  /** Token separator regex used to parse Shell tool outputs */
+  public static final String TOKEN_SEPARATOR_REGEX
+                = WINDOWS ? "[|\n\r]" : "[ \t\n\r\f]";
+
   private long    interval;   // refresh interval in msec
   private long    lastTime;   // last time the command was performed
   private Map<String, String> environment; // env for the command execution
@@ -144,7 +307,19 @@ abstract public class Shell {
       builder.directory(this.dir);
     }
     
-    process = builder.start();
+    if (Shell.WINDOWS) {
+      synchronized (WindowsProcessLaunchLock) {
+        // To workaround the race condition issue with child processes
+        // inheriting unintended handles during process launch that can
+        // lead to hangs on reading output and error streams, we
+        // serialize process creation. More info available at:
+        // http://support.microsoft.com/kb/315939
+        process = builder.start();
+      }
+    } else {
+      process = builder.start();
+    }
+
     if (timeOutInterval > 0) {
       timeOutTimer = new Timer("Shell command timeout");
       timeoutTimerTask = new ShellTimeoutTimerTask(

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java?rev=1485845&r1=1485844&r2=1485845&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java Thu May 23 20:41:53 2013
@@ -30,13 +30,17 @@ import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.StringTokenizer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.commons.lang.SystemUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.Shell;
 
 import com.google.common.net.InetAddresses;
 
@@ -53,6 +57,27 @@ public class StringUtils {
   public static final int SHUTDOWN_HOOK_PRIORITY = 0;
 
   /**
+   * Shell environment variables: $ followed by one letter or _ followed by
+   * multiple letters, numbers, or underscores.  The group captures the
+   * environment variable name without the leading $.
+   */
+  public static final Pattern SHELL_ENV_VAR_PATTERN =
+    Pattern.compile("\\$([A-Za-z_]{1}[A-Za-z0-9_]*)");
+
+  /**
+   * Windows environment variables: surrounded by %.  The group captures the
+   * environment variable name without the leading and trailing %.
+   */
+  public static final Pattern WIN_ENV_VAR_PATTERN = Pattern.compile("%(.*?)%");
+
+  /**
+   * Regular expression that matches and captures environment variable names
+   * according to platform-specific rules.
+   */
+  public static final Pattern ENV_VAR_PATTERN = Shell.WINDOWS ?
+    WIN_ENV_VAR_PATTERN : SHELL_ENV_VAR_PATTERN;
+
+  /**
    * Make a string representation of the exception.
    * @param e The exception to stringify
    * @return A string with exception name and call stack.
@@ -800,6 +825,28 @@ public class StringUtils {
   }
 
   /**
+   * Concatenates strings, using a separator.
+   *
+   * @param separator to join with
+   * @param strings to join
+   * @return  the joined string
+   */
+  public static String join(CharSequence separator, String[] strings) {
+    // Ideally we don't have to duplicate the code here if array is iterable.
+    StringBuilder sb = new StringBuilder();
+    boolean first = true;
+    for (String s : strings) {
+      if (first) {
+        first = false;
+      } else {
+        sb.append(separator);
+      }
+      sb.append(s);
+    }
+    return sb.toString();
+  }
+
+  /**
    * Convert SOME_STUFF to SomeStuff
    *
    * @param s input string
@@ -814,4 +861,37 @@ public class StringUtils {
 
     return sb.toString();
   }
+
+  /**
+   * Matches a template string against a pattern, replaces matched tokens with
+   * the supplied replacements, and returns the result.  The regular expression
+   * must use a capturing group.  The value of the first capturing group is used
+   * to look up the replacement.  If no replacement is found for the token, then
+   * it is replaced with the empty string.
+   * 
+   * For example, assume template is "%foo%_%bar%_%baz%", pattern is "%(.*?)%",
+   * and replacements contains 2 entries, mapping "foo" to "zoo" and "baz" to
+   * "zaz".  The result returned would be "zoo__zaz".
+   * 
+   * @param template String template to receive replacements
+   * @param pattern Pattern to match for identifying tokens, must use a capturing
+   *   group
+   * @param replacements Map<String, String> mapping tokens identified by the
+   *   capturing group to their replacement values
+   * @return String template with replacements
+   */
+  public static String replaceTokens(String template, Pattern pattern,
+      Map<String, String> replacements) {
+    StringBuffer sb = new StringBuffer();
+    Matcher matcher = pattern.matcher(template);
+    while (matcher.find()) {
+      String replacement = replacements.get(matcher.group(1));
+      if (replacement == null) {
+        replacement = "";
+      }
+      matcher.appendReplacement(sb, Matcher.quoteReplacement(replacement));
+    }
+    matcher.appendTail(sb);
+    return sb.toString();
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/overview.html
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/overview.html?rev=1485845&r1=1485844&r2=1485845&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/overview.html (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/overview.html Thu May 23 20:41:53 2013
@@ -60,9 +60,7 @@ that process vast amounts of data. Here'
     Hadoop was been demonstrated on GNU/Linux clusters with 2000 nodes.
   </li>
   <li>
-    Win32 is supported as a <i>development</i> platform. Distributed operation 
-    has not been well tested on Win32, so this is not a <i>production</i> 
-    platform.
+    Windows is also a supported platform.
   </li>  
 </ul>
   
@@ -84,15 +82,6 @@ that process vast amounts of data. Here'
   </li>
 </ol>
 
-<h4>Additional requirements for Windows</h4>
-
-<ol>
-  <li>
-    <a href="http://www.cygwin.com/">Cygwin</a> - Required for shell support in 
-    addition to the required software above.
-  </li>
-</ol>
-  
 <h3>Installing Required Software</h3>
 
 <p>If your platform does not have the required software listed above, you
@@ -104,13 +93,6 @@ $ sudo apt-get install ssh<br>
 $ sudo apt-get install rsync<br>
 </pre></blockquote></p>
 
-<p>On Windows, if you did not install the required software when you
-installed cygwin, start the cygwin installer and select the packages:</p>
-<ul>
-  <li>openssh - the "Net" category</li>
-  <li>rsync - the "Net" category</li>
-</ul>
-
 <h2>Getting Started</h2>
 
 <p>First, you need to get a copy of the Hadoop code.</p>

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Compressor.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Compressor.c?rev=1485845&r1=1485844&r2=1485845&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Compressor.c (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Compressor.c Thu May 23 20:41:53 2013
@@ -16,10 +16,14 @@
  * limitations under the License.
  */
 
-#include "config.h"
+
 #include "org_apache_hadoop.h"
 #include "org_apache_hadoop_io_compress_lz4_Lz4Compressor.h"
 
+#ifdef UNIX
+#include "config.h"
+#endif // UNIX
+
 //****************************
 // Simple Functions
 //****************************
@@ -61,6 +65,9 @@ JNIEXPORT void JNICALL Java_org_apache_h
 
 JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Compressor_compressBytesDirect
 (JNIEnv *env, jobject thisj){
+  const char* uncompressed_bytes;
+  char *compressed_bytes;
+
   // Get members of Lz4Compressor
   jobject clazz = (*env)->GetStaticObjectField(env, thisj, Lz4Compressor_clazz);
   jobject uncompressed_direct_buf = (*env)->GetObjectField(env, thisj, Lz4Compressor_uncompressedDirectBuf);
@@ -70,7 +77,7 @@ JNIEXPORT jint JNICALL Java_org_apache_h
 
   // Get the input direct buffer
   LOCK_CLASS(env, clazz, "Lz4Compressor");
-  const char* uncompressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf);
+  uncompressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf);
   UNLOCK_CLASS(env, clazz, "Lz4Compressor");
 
   if (uncompressed_bytes == 0) {
@@ -79,7 +86,7 @@ JNIEXPORT jint JNICALL Java_org_apache_h
 
   // Get the output direct buffer
   LOCK_CLASS(env, clazz, "Lz4Compressor");
-  char* compressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, compressed_direct_buf);
+  compressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, compressed_direct_buf);
   UNLOCK_CLASS(env, clazz, "Lz4Compressor");
 
   if (compressed_bytes == 0) {

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.c?rev=1485845&r1=1485844&r2=1485845&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.c (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.c Thu May 23 20:41:53 2013
@@ -16,10 +16,13 @@
  * limitations under the License.
  */
 
-#include "config.h"
 #include "org_apache_hadoop.h"
 #include "org_apache_hadoop_io_compress_lz4_Lz4Decompressor.h"
 
+#ifdef UNIX
+#include "config.h"
+#endif // UNIX
+
 int LZ4_uncompress_unknownOutputSize(const char* source, char* dest, int isize, int maxOutputSize);
 
 /*
@@ -58,6 +61,9 @@ JNIEXPORT void JNICALL Java_org_apache_h
 
 JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Decompressor_decompressBytesDirect
 (JNIEnv *env, jobject thisj){
+  const char *compressed_bytes;
+  char *uncompressed_bytes;
+
   // Get members of Lz4Decompressor
   jobject clazz = (*env)->GetStaticObjectField(env,thisj, Lz4Decompressor_clazz);
   jobject compressed_direct_buf = (*env)->GetObjectField(env,thisj, Lz4Decompressor_compressedDirectBuf);
@@ -67,7 +73,7 @@ JNIEXPORT jint JNICALL Java_org_apache_h
 
   // Get the input direct buffer
   LOCK_CLASS(env, clazz, "Lz4Decompressor");
-  const char* compressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, compressed_direct_buf);
+  compressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, compressed_direct_buf);
   UNLOCK_CLASS(env, clazz, "Lz4Decompressor");
 
   if (compressed_bytes == 0) {
@@ -76,7 +82,7 @@ JNIEXPORT jint JNICALL Java_org_apache_h
 
   // Get the output direct buffer
   LOCK_CLASS(env, clazz, "Lz4Decompressor");
-  char* uncompressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf);
+  uncompressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf);
   UNLOCK_CLASS(env, clazz, "Lz4Decompressor");
 
   if (uncompressed_bytes == 0) {

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/snappy/SnappyCompressor.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/snappy/SnappyCompressor.c?rev=1485845&r1=1485844&r2=1485845&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/snappy/SnappyCompressor.c (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/snappy/SnappyCompressor.c Thu May 23 20:41:53 2013
@@ -16,12 +16,18 @@
  * limitations under the License.
  */
 
-#include <dlfcn.h>
+
+#if defined HADOOP_SNAPPY_LIBRARY
+
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 
+#ifdef UNIX
+#include <dlfcn.h>
 #include "config.h"
+#endif // UNIX
+
 #include "org_apache_hadoop_io_compress_snappy.h"
 #include "org_apache_hadoop_io_compress_snappy_SnappyCompressor.h"
 
@@ -81,7 +87,7 @@ JNIEXPORT jint JNICALL Java_org_apache_h
   UNLOCK_CLASS(env, clazz, "SnappyCompressor");
 
   if (uncompressed_bytes == 0) {
-    return 0;
+    return (jint)0;
   }
 
   // Get the output direct buffer
@@ -90,7 +96,7 @@ JNIEXPORT jint JNICALL Java_org_apache_h
   UNLOCK_CLASS(env, clazz, "SnappyCompressor");
 
   if (compressed_bytes == 0) {
-    return 0;
+    return (jint)0;
   }
 
   /* size_t should always be 4 bytes or larger. */
@@ -109,3 +115,5 @@ JNIEXPORT jint JNICALL Java_org_apache_h
   (*env)->SetIntField(env, thisj, SnappyCompressor_uncompressedDirectBufLen, 0);
   return (jint)buf_len;
 }
+
+#endif //define HADOOP_SNAPPY_LIBRARY

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.c?rev=1485845&r1=1485844&r2=1485845&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.c (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.c Thu May 23 20:41:53 2013
@@ -16,12 +16,18 @@
  * limitations under the License.
  */
 
-#include <dlfcn.h>
+
+#if defined HADOOP_SNAPPY_LIBRARY
+
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 
+#ifdef UNIX
 #include "config.h"
+#include <dlfcn.h>
+#endif
+
 #include "org_apache_hadoop_io_compress_snappy.h"
 #include "org_apache_hadoop_io_compress_snappy_SnappyDecompressor.h"
 
@@ -103,3 +109,5 @@ JNIEXPORT jint JNICALL Java_org_apache_h
 
   return (jint)uncompressed_direct_buf_len;
 }
+
+#endif //define HADOOP_SNAPPY_LIBRARY

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zlib/ZlibCompressor.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zlib/ZlibCompressor.c?rev=1485845&r1=1485844&r2=1485845&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zlib/ZlibCompressor.c (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zlib/ZlibCompressor.c Thu May 23 20:41:53 2013
@@ -16,12 +16,15 @@
  * limitations under the License.
  */
 
-#include <dlfcn.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 
+#ifdef UNIX
+#include <dlfcn.h>
 #include "config.h"
+#endif
+
 #include "org_apache_hadoop_io_compress_zlib.h"
 #include "org_apache_hadoop_io_compress_zlib_ZlibCompressor.h"
 
@@ -35,48 +38,124 @@ static jfieldID ZlibCompressor_directBuf
 static jfieldID ZlibCompressor_finish;
 static jfieldID ZlibCompressor_finished;
 
+#ifdef UNIX
 static int (*dlsym_deflateInit2_)(z_streamp, int, int, int, int, int, const char *, int);
 static int (*dlsym_deflate)(z_streamp, int);
 static int (*dlsym_deflateSetDictionary)(z_streamp, const Bytef *, uInt);
 static int (*dlsym_deflateReset)(z_streamp);
 static int (*dlsym_deflateEnd)(z_streamp);
+#endif
+
+#ifdef WINDOWS
+#include <Strsafe.h>
+typedef int (__cdecl *__dlsym_deflateInit2_) (z_streamp, int, int, int, int, int, const char *, int);
+typedef int (__cdecl *__dlsym_deflate) (z_streamp, int);
+typedef int (__cdecl *__dlsym_deflateSetDictionary) (z_streamp, const Bytef *, uInt);
+typedef int (__cdecl *__dlsym_deflateReset) (z_streamp);
+typedef int (__cdecl *__dlsym_deflateEnd) (z_streamp);
+static __dlsym_deflateInit2_ dlsym_deflateInit2_;
+static __dlsym_deflate dlsym_deflate;
+static __dlsym_deflateSetDictionary dlsym_deflateSetDictionary;
+static __dlsym_deflateReset dlsym_deflateReset;
+static __dlsym_deflateEnd dlsym_deflateEnd;
+
+// Try to load zlib.dll from the dir where hadoop.dll is located.
+HANDLE LoadZlibTryHadoopNativeDir() {
+  HMODULE libz = NULL;
+  PCWSTR HADOOP_DLL = L"hadoop.dll";
+  size_t HADOOP_DLL_LEN = 10;
+  WCHAR path[MAX_PATH] = { 0 };
+  BOOL isPathValid = FALSE;
+
+  // Get hadoop.dll full path
+  HMODULE hModule = GetModuleHandle(HADOOP_DLL);
+  if (hModule != NULL) {
+    if (GetModuleFileName(hModule, path, MAX_PATH) > 0) {
+      size_t size = 0;
+      if (StringCchLength(path, MAX_PATH, &size) == S_OK) {
+
+        // Update path variable to have the full path to the zlib.dll
+        size = size - HADOOP_DLL_LEN;
+        if (size >= 0) {
+          path[size] = L'\0';
+          if (StringCchCat(path, MAX_PATH, HADOOP_ZLIB_LIBRARY) == S_OK) {
+            isPathValid = TRUE;
+          }
+        }
+      }
+    }
+  }
+
+  if (isPathValid) {
+    libz = LoadLibrary(path);
+  }
+
+  // fallback to system paths
+  if (!libz) {
+    libz = LoadLibrary(HADOOP_ZLIB_LIBRARY);
+  }
+
+  return libz;
+}
+#endif
 
 JNIEXPORT void JNICALL
 Java_org_apache_hadoop_io_compress_zlib_ZlibCompressor_initIDs(
 	JNIEnv *env, jclass class
 	) {
+#ifdef UNIX
 	// Load libz.so
 	void *libz = dlopen(HADOOP_ZLIB_LIBRARY, RTLD_LAZY | RTLD_GLOBAL);
-	if (!libz) {
+  if (!libz) {
 		THROW(env, "java/lang/UnsatisfiedLinkError", "Cannot load libz.so");
 	  	return;
 	}
+#endif
+
+#ifdef WINDOWS
+  HMODULE libz = LoadZlibTryHadoopNativeDir();
+
+  if (!libz) {
+		THROW(env, "java/lang/UnsatisfiedLinkError", "Cannot load zlib1.dll");
+    return;
+	}
+#endif
 
+#ifdef UNIX
 	// Locate the requisite symbols from libz.so
 	dlerror();                                 // Clear any existing error
-	LOAD_DYNAMIC_SYMBOL(dlsym_deflateInit2_, env, libz, "deflateInit2_");
-	LOAD_DYNAMIC_SYMBOL(dlsym_deflate, env, libz, "deflate");
-	LOAD_DYNAMIC_SYMBOL(dlsym_deflateSetDictionary, env, libz, "deflateSetDictionary");
-	LOAD_DYNAMIC_SYMBOL(dlsym_deflateReset, env, libz, "deflateReset");
-	LOAD_DYNAMIC_SYMBOL(dlsym_deflateEnd, env, libz, "deflateEnd");
+  LOAD_DYNAMIC_SYMBOL(dlsym_deflateInit2_, env, libz, "deflateInit2_");
+  LOAD_DYNAMIC_SYMBOL(dlsym_deflate, env, libz, "deflate");
+  LOAD_DYNAMIC_SYMBOL(dlsym_deflateSetDictionary, env, libz, "deflateSetDictionary");
+  LOAD_DYNAMIC_SYMBOL(dlsym_deflateReset, env, libz, "deflateReset");
+  LOAD_DYNAMIC_SYMBOL(dlsym_deflateEnd, env, libz, "deflateEnd");
+#endif
+
+#ifdef WINDOWS
+  LOAD_DYNAMIC_SYMBOL(__dlsym_deflateInit2_, dlsym_deflateInit2_, env, libz, "deflateInit2_");
+	LOAD_DYNAMIC_SYMBOL(__dlsym_deflate, dlsym_deflate, env, libz, "deflate");
+	LOAD_DYNAMIC_SYMBOL(__dlsym_deflateSetDictionary, dlsym_deflateSetDictionary, env, libz, "deflateSetDictionary");
+	LOAD_DYNAMIC_SYMBOL(__dlsym_deflateReset, dlsym_deflateReset, env, libz, "deflateReset");
+	LOAD_DYNAMIC_SYMBOL(__dlsym_deflateEnd, dlsym_deflateEnd, env, libz, "deflateEnd");
+#endif
 
 	// Initialize the requisite fieldIds
-    ZlibCompressor_clazz = (*env)->GetStaticFieldID(env, class, "clazz", 
+    ZlibCompressor_clazz = (*env)->GetStaticFieldID(env, class, "clazz",
                                                       "Ljava/lang/Class;");
     ZlibCompressor_stream = (*env)->GetFieldID(env, class, "stream", "J");
     ZlibCompressor_finish = (*env)->GetFieldID(env, class, "finish", "Z");
     ZlibCompressor_finished = (*env)->GetFieldID(env, class, "finished", "Z");
-    ZlibCompressor_uncompressedDirectBuf = (*env)->GetFieldID(env, class, 
-    									"uncompressedDirectBuf", 
+    ZlibCompressor_uncompressedDirectBuf = (*env)->GetFieldID(env, class,
+        "uncompressedDirectBuf",
     									"Ljava/nio/Buffer;");
-    ZlibCompressor_uncompressedDirectBufOff = (*env)->GetFieldID(env, class, 
+    ZlibCompressor_uncompressedDirectBufOff = (*env)->GetFieldID(env, class,
     										"uncompressedDirectBufOff", "I");
-    ZlibCompressor_uncompressedDirectBufLen = (*env)->GetFieldID(env, class, 
+    ZlibCompressor_uncompressedDirectBufLen = (*env)->GetFieldID(env, class,
     										"uncompressedDirectBufLen", "I");
-    ZlibCompressor_compressedDirectBuf = (*env)->GetFieldID(env, class, 
-    									"compressedDirectBuf", 
+    ZlibCompressor_compressedDirectBuf = (*env)->GetFieldID(env, class,
+                      "compressedDirectBuf",
     									"Ljava/nio/Buffer;");
-    ZlibCompressor_directBufferSize = (*env)->GetFieldID(env, class, 
+    ZlibCompressor_directBufferSize = (*env)->GetFieldID(env, class,
     										"directBufferSize", "I");
 }
 
@@ -84,7 +163,9 @@ JNIEXPORT jlong JNICALL
 Java_org_apache_hadoop_io_compress_zlib_ZlibCompressor_init(
 	JNIEnv *env, jclass class, jint level, jint strategy, jint windowBits
 	) {
-	// Create a z_stream
+    int rv = 0;
+    static const int memLevel = 8; 							// See zconf.h
+	  // Create a z_stream
     z_stream *stream = malloc(sizeof(z_stream));
     if (!stream) {
 		THROW(env, "java/lang/OutOfMemoryError", NULL);
@@ -93,17 +174,16 @@ Java_org_apache_hadoop_io_compress_zlib_
     memset((void*)stream, 0, sizeof(z_stream));
 
 	// Initialize stream
-	static const int memLevel = 8; 							// See zconf.h
-    int rv = (*dlsym_deflateInit2_)(stream, level, Z_DEFLATED, windowBits,
+    rv = (*dlsym_deflateInit2_)(stream, level, Z_DEFLATED, windowBits,
     			memLevel, strategy, ZLIB_VERSION, sizeof(z_stream));
-    			
+
     if (rv != Z_OK) {
 	    // Contingency - Report error by throwing appropriate exceptions
 	    free(stream);
 	    stream = NULL;
-	
+
 		switch (rv) {
-			case Z_MEM_ERROR: 
+			case Z_MEM_ERROR:
 			    {
 		    		THROW(env, "java/lang/OutOfMemoryError", NULL);
 			    }
@@ -120,27 +200,28 @@ Java_org_apache_hadoop_io_compress_zlib_
 		    break;
 	    }
 	}
-	
+
     return JLONG(stream);
 }
 
 JNIEXPORT void JNICALL
 Java_org_apache_hadoop_io_compress_zlib_ZlibCompressor_setDictionary(
-	JNIEnv *env, jclass class, jlong stream, 
+	JNIEnv *env, jclass class, jlong stream,
 	jarray b, jint off, jint len
 	) {
+    int rv = 0;
     Bytef *buf = (*env)->GetPrimitiveArrayCritical(env, b, 0);
     if (!buf) {
         return;
     }
-    int rv = dlsym_deflateSetDictionary(ZSTREAM(stream), buf + off, len);
+    rv = dlsym_deflateSetDictionary(ZSTREAM(stream), buf + off, len);
     (*env)->ReleasePrimitiveArrayCritical(env, b, buf, 0);
-    
+
     if (rv != Z_OK) {
     	// Contingency - Report error by throwing appropriate exceptions
 	    switch (rv) {
 		    case Z_STREAM_ERROR:
-			{	
+			{
 		    	THROW(env, "java/lang/IllegalArgumentException", NULL);
 			}
 			break;
@@ -157,75 +238,85 @@ JNIEXPORT jint JNICALL
 Java_org_apache_hadoop_io_compress_zlib_ZlibCompressor_deflateBytesDirect(
 	JNIEnv *env, jobject this
 	) {
+    jobject clazz = NULL;
+    jobject uncompressed_direct_buf = NULL;
+    jint uncompressed_direct_buf_off = 0;
+    jint uncompressed_direct_buf_len = 0;
+    jobject compressed_direct_buf = NULL;
+    jint compressed_direct_buf_len = 0;
+    jboolean finish;
+    Bytef* uncompressed_bytes = NULL;
+    Bytef* compressed_bytes = NULL;
+    int rv = 0;
+    jint no_compressed_bytes = 0;
 	// Get members of ZlibCompressor
     z_stream *stream = ZSTREAM(
-    						(*env)->GetLongField(env, this, 
+                (*env)->GetLongField(env, this,
     									ZlibCompressor_stream)
     					);
     if (!stream) {
 		THROW(env, "java/lang/NullPointerException", NULL);
 		return (jint)0;
-    } 
+    }
 
     // Get members of ZlibCompressor
-    jobject clazz = (*env)->GetStaticObjectField(env, this, 
+    clazz = (*env)->GetStaticObjectField(env, this,
                                                  ZlibCompressor_clazz);
-	jobject uncompressed_direct_buf = (*env)->GetObjectField(env, this, 
+	uncompressed_direct_buf = (*env)->GetObjectField(env, this,
 									ZlibCompressor_uncompressedDirectBuf);
-	jint uncompressed_direct_buf_off = (*env)->GetIntField(env, this, 
+	uncompressed_direct_buf_off = (*env)->GetIntField(env, this,
 									ZlibCompressor_uncompressedDirectBufOff);
-	jint uncompressed_direct_buf_len = (*env)->GetIntField(env, this, 
+	uncompressed_direct_buf_len = (*env)->GetIntField(env, this,
 									ZlibCompressor_uncompressedDirectBufLen);
 
-	jobject compressed_direct_buf = (*env)->GetObjectField(env, this, 
+	compressed_direct_buf = (*env)->GetObjectField(env, this,
 									ZlibCompressor_compressedDirectBuf);
-	jint compressed_direct_buf_len = (*env)->GetIntField(env, this, 
+	compressed_direct_buf_len = (*env)->GetIntField(env, this,
 									ZlibCompressor_directBufferSize);
 
-	jboolean finish = (*env)->GetBooleanField(env, this, ZlibCompressor_finish);
+	finish = (*env)->GetBooleanField(env, this, ZlibCompressor_finish);
 
     // Get the input direct buffer
     LOCK_CLASS(env, clazz, "ZlibCompressor");
-	Bytef* uncompressed_bytes = (*env)->GetDirectBufferAddress(env, 
+    uncompressed_bytes = (*env)->GetDirectBufferAddress(env,
 											uncompressed_direct_buf);
     UNLOCK_CLASS(env, clazz, "ZlibCompressor");
-    
+
   	if (uncompressed_bytes == 0) {
     	return (jint)0;
 	}
-	
+
     // Get the output direct buffer
     LOCK_CLASS(env, clazz, "ZlibCompressor");
-	Bytef* compressed_bytes = (*env)->GetDirectBufferAddress(env, 
+    compressed_bytes = (*env)->GetDirectBufferAddress(env,
 										compressed_direct_buf);
     UNLOCK_CLASS(env, clazz, "ZlibCompressor");
 
   	if (compressed_bytes == 0) {
 		return (jint)0;
 	}
-	
+
 	// Re-calibrate the z_stream
   	stream->next_in = uncompressed_bytes + uncompressed_direct_buf_off;
   	stream->next_out = compressed_bytes;
   	stream->avail_in = uncompressed_direct_buf_len;
-	stream->avail_out = compressed_direct_buf_len;
-	
+    stream->avail_out = compressed_direct_buf_len;
+
 	// Compress
-	int rv = dlsym_deflate(stream, finish ? Z_FINISH : Z_NO_FLUSH);
+	rv = dlsym_deflate(stream, finish ? Z_FINISH : Z_NO_FLUSH);
 
-	jint no_compressed_bytes = 0;
 	switch (rv) {
     	// Contingency? - Report error by throwing appropriate exceptions
   		case Z_STREAM_END:
   		{
   			(*env)->SetBooleanField(env, this, ZlibCompressor_finished, JNI_TRUE);
   		} // cascade
-	  	case Z_OK: 
+      case Z_OK:
 	  	{
 	  		uncompressed_direct_buf_off += uncompressed_direct_buf_len - stream->avail_in;
-			(*env)->SetIntField(env, this, 
+			(*env)->SetIntField(env, this,
 						ZlibCompressor_uncompressedDirectBufOff, uncompressed_direct_buf_off);
-			(*env)->SetIntField(env, this, 
+			(*env)->SetIntField(env, this,
 						ZlibCompressor_uncompressedDirectBufLen, stream->avail_in);
 			no_compressed_bytes = compressed_direct_buf_len - stream->avail_out;
 	  	}
@@ -238,7 +329,7 @@ Java_org_apache_hadoop_io_compress_zlib_
 		}
 		break;
   	}
-  	
+
   	return no_compressed_bytes;
 }
 

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c?rev=1485845&r1=1485844&r2=1485845&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c Thu May 23 20:41:53 2013
@@ -16,12 +16,15 @@
  * limitations under the License.
  */
 
-#include <dlfcn.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 
+#ifdef UNIX
+#include <dlfcn.h>
 #include "config.h"
+#endif
+
 #include "org_apache_hadoop_io_compress_zlib.h"
 #include "org_apache_hadoop_io_compress_zlib_ZlibDecompressor.h"
 
@@ -35,48 +38,88 @@ static jfieldID ZlibDecompressor_directB
 static jfieldID ZlibDecompressor_needDict;
 static jfieldID ZlibDecompressor_finished;
 
+#ifdef UNIX
 static int (*dlsym_inflateInit2_)(z_streamp, int, const char *, int);
 static int (*dlsym_inflate)(z_streamp, int);
 static int (*dlsym_inflateSetDictionary)(z_streamp, const Bytef *, uInt);
 static int (*dlsym_inflateReset)(z_streamp);
 static int (*dlsym_inflateEnd)(z_streamp);
+#endif
+
+#ifdef WINDOWS
+#include <Strsafe.h>
+typedef int (__cdecl *__dlsym_inflateInit2_)(z_streamp, int, const char *, int);
+typedef int (__cdecl *__dlsym_inflate)(z_streamp, int);
+typedef int (__cdecl *__dlsym_inflateSetDictionary)(z_streamp, const Bytef *, uInt);
+typedef int (__cdecl *__dlsym_inflateReset)(z_streamp);
+typedef int (__cdecl *__dlsym_inflateEnd)(z_streamp);
+static __dlsym_inflateInit2_ dlsym_inflateInit2_;
+static __dlsym_inflate dlsym_inflate;
+static __dlsym_inflateSetDictionary dlsym_inflateSetDictionary;
+static __dlsym_inflateReset dlsym_inflateReset;
+static __dlsym_inflateEnd dlsym_inflateEnd;
+extern HANDLE LoadZlibTryHadoopNativeDir();
+#endif
 
 JNIEXPORT void JNICALL
 Java_org_apache_hadoop_io_compress_zlib_ZlibDecompressor_initIDs(
-	JNIEnv *env, jclass class
+JNIEnv *env, jclass class
 	) {
 	// Load libz.so
-    void *libz = dlopen(HADOOP_ZLIB_LIBRARY, RTLD_LAZY | RTLD_GLOBAL);
+#ifdef UNIX
+  void *libz = dlopen(HADOOP_ZLIB_LIBRARY, RTLD_LAZY | RTLD_GLOBAL);
 	if (!libz) {
 	  THROW(env, "java/lang/UnsatisfiedLinkError", "Cannot load libz.so");
 	  return;
-	} 
+	}
+#endif
+
+#ifdef WINDOWS
+  HMODULE libz = LoadZlibTryHadoopNativeDir();
+
+	if (!libz) {
+	  THROW(env, "java/lang/UnsatisfiedLinkError", "Cannot load zlib1.dll");
+	  return;
+	}
+#endif
+
 
 	// Locate the requisite symbols from libz.so
+#ifdef UNIX
 	dlerror();                                 // Clear any existing error
 	LOAD_DYNAMIC_SYMBOL(dlsym_inflateInit2_, env, libz, "inflateInit2_");
 	LOAD_DYNAMIC_SYMBOL(dlsym_inflate, env, libz, "inflate");
 	LOAD_DYNAMIC_SYMBOL(dlsym_inflateSetDictionary, env, libz, "inflateSetDictionary");
 	LOAD_DYNAMIC_SYMBOL(dlsym_inflateReset, env, libz, "inflateReset");
 	LOAD_DYNAMIC_SYMBOL(dlsym_inflateEnd, env, libz, "inflateEnd");
+#endif
+
+#ifdef WINDOWS
+	LOAD_DYNAMIC_SYMBOL(__dlsym_inflateInit2_, dlsym_inflateInit2_, env, libz, "inflateInit2_");
+	LOAD_DYNAMIC_SYMBOL(__dlsym_inflate, dlsym_inflate, env, libz, "inflate");
+	LOAD_DYNAMIC_SYMBOL(__dlsym_inflateSetDictionary, dlsym_inflateSetDictionary, env, libz, "inflateSetDictionary");
+	LOAD_DYNAMIC_SYMBOL(__dlsym_inflateReset, dlsym_inflateReset, env, libz, "inflateReset");
+	LOAD_DYNAMIC_SYMBOL(__dlsym_inflateEnd, dlsym_inflateEnd, env, libz, "inflateEnd");
+#endif
+
 
-	// Initialize the requisite fieldIds
-    ZlibDecompressor_clazz = (*env)->GetStaticFieldID(env, class, "clazz", 
+  // Initialize the requisite fieldIds
+    ZlibDecompressor_clazz = (*env)->GetStaticFieldID(env, class, "clazz",
                                                       "Ljava/lang/Class;");
     ZlibDecompressor_stream = (*env)->GetFieldID(env, class, "stream", "J");
     ZlibDecompressor_needDict = (*env)->GetFieldID(env, class, "needDict", "Z");
     ZlibDecompressor_finished = (*env)->GetFieldID(env, class, "finished", "Z");
-    ZlibDecompressor_compressedDirectBuf = (*env)->GetFieldID(env, class, 
-    											"compressedDirectBuf", 
+    ZlibDecompressor_compressedDirectBuf = (*env)->GetFieldID(env, class,
+                          "compressedDirectBuf",
     											"Ljava/nio/Buffer;");
-    ZlibDecompressor_compressedDirectBufOff = (*env)->GetFieldID(env, class, 
+    ZlibDecompressor_compressedDirectBufOff = (*env)->GetFieldID(env, class,
     										"compressedDirectBufOff", "I");
-    ZlibDecompressor_compressedDirectBufLen = (*env)->GetFieldID(env, class, 
+    ZlibDecompressor_compressedDirectBufLen = (*env)->GetFieldID(env, class,
     										"compressedDirectBufLen", "I");
-    ZlibDecompressor_uncompressedDirectBuf = (*env)->GetFieldID(env, class, 
-    											"uncompressedDirectBuf", 
+    ZlibDecompressor_uncompressedDirectBuf = (*env)->GetFieldID(env, class,
+                          "uncompressedDirectBuf",
     											"Ljava/nio/Buffer;");
-    ZlibDecompressor_directBufferSize = (*env)->GetFieldID(env, class, 
+    ZlibDecompressor_directBufferSize = (*env)->GetFieldID(env, class,
     											"directBufferSize", "I");
 }
 
@@ -84,21 +127,22 @@ JNIEXPORT jlong JNICALL
 Java_org_apache_hadoop_io_compress_zlib_ZlibDecompressor_init(
 	JNIEnv *env, jclass cls, jint windowBits
 	) {
+    int rv = 0;
     z_stream *stream = malloc(sizeof(z_stream));
     memset((void*)stream, 0, sizeof(z_stream));
 
     if (stream == 0) {
 		THROW(env, "java/lang/OutOfMemoryError", NULL);
 		return (jlong)0;
-    } 
-    
-    int rv = dlsym_inflateInit2_(stream, windowBits, ZLIB_VERSION, sizeof(z_stream));
+    }
+
+    rv = dlsym_inflateInit2_(stream, windowBits, ZLIB_VERSION, sizeof(z_stream));
 
 	if (rv != Z_OK) {
 	    // Contingency - Report error by throwing appropriate exceptions
 		free(stream);
 		stream = NULL;
-		
+
 		switch (rv) {
 		 	case Z_MEM_ERROR:
 		 	{
@@ -112,7 +156,7 @@ Java_org_apache_hadoop_io_compress_zlib_
 	  		break;
 		}
 	}
-	
+
 	return JLONG(stream);
 }
 
@@ -121,21 +165,22 @@ Java_org_apache_hadoop_io_compress_zlib_
 	JNIEnv *env, jclass cls, jlong stream,
 	jarray b, jint off, jint len
 	) {
+    int rv = 0;
     Bytef *buf = (*env)->GetPrimitiveArrayCritical(env, b, 0);
     if (!buf) {
 		THROW(env, "java/lang/InternalError", NULL);
         return;
     }
-    int rv = dlsym_inflateSetDictionary(ZSTREAM(stream), buf + off, len);
+    rv = dlsym_inflateSetDictionary(ZSTREAM(stream), buf + off, len);
     (*env)->ReleasePrimitiveArrayCritical(env, b, buf, 0);
-    
+
     if (rv != Z_OK) {
 	    // Contingency - Report error by throwing appropriate exceptions
 		switch (rv) {
 		    case Z_STREAM_ERROR:
 	    	case Z_DATA_ERROR:
 			{
-				THROW(env, "java/lang/IllegalArgumentException", 
+				THROW(env, "java/lang/IllegalArgumentException",
 					(ZSTREAM(stream))->msg);
 			}
 			break;
@@ -152,62 +197,71 @@ JNIEXPORT jint JNICALL
 Java_org_apache_hadoop_io_compress_zlib_ZlibDecompressor_inflateBytesDirect(
 	JNIEnv *env, jobject this
 	) {
+    jobject clazz = NULL;
+    jarray compressed_direct_buf = NULL;
+    jint compressed_direct_buf_off = 0;
+    jint compressed_direct_buf_len = 0;
+    jarray uncompressed_direct_buf = NULL;
+    jint uncompressed_direct_buf_len = 0;
+    Bytef *compressed_bytes = NULL;
+    Bytef *uncompressed_bytes = NULL;
+    int rv = 0;
+    int no_decompressed_bytes = 0;
 	// Get members of ZlibDecompressor
     z_stream *stream = ZSTREAM(
-    						(*env)->GetLongField(env, this, 
+                (*env)->GetLongField(env, this,
     									ZlibDecompressor_stream)
     					);
     if (!stream) {
 		THROW(env, "java/lang/NullPointerException", NULL);
 		return (jint)0;
-    } 
+    }
 
     // Get members of ZlibDecompressor
-    jobject clazz = (*env)->GetStaticObjectField(env, this, 
+    clazz = (*env)->GetStaticObjectField(env, this,
                                                  ZlibDecompressor_clazz);
-	jarray compressed_direct_buf = (jarray)(*env)->GetObjectField(env, this, 
+	compressed_direct_buf = (jarray)(*env)->GetObjectField(env, this,
 											ZlibDecompressor_compressedDirectBuf);
-	jint compressed_direct_buf_off = (*env)->GetIntField(env, this, 
+	compressed_direct_buf_off = (*env)->GetIntField(env, this,
 									ZlibDecompressor_compressedDirectBufOff);
-	jint compressed_direct_buf_len = (*env)->GetIntField(env, this, 
+	compressed_direct_buf_len = (*env)->GetIntField(env, this,
 									ZlibDecompressor_compressedDirectBufLen);
 
-	jarray uncompressed_direct_buf = (jarray)(*env)->GetObjectField(env, this, 
+	uncompressed_direct_buf = (jarray)(*env)->GetObjectField(env, this,
 											ZlibDecompressor_uncompressedDirectBuf);
-	jint uncompressed_direct_buf_len = (*env)->GetIntField(env, this, 
+	uncompressed_direct_buf_len = (*env)->GetIntField(env, this,
 										ZlibDecompressor_directBufferSize);
 
     // Get the input direct buffer
     LOCK_CLASS(env, clazz, "ZlibDecompressor");
-	Bytef *compressed_bytes = (*env)->GetDirectBufferAddress(env, 
+	compressed_bytes = (*env)->GetDirectBufferAddress(env,
 										compressed_direct_buf);
     UNLOCK_CLASS(env, clazz, "ZlibDecompressor");
-    
+
 	if (!compressed_bytes) {
 	    return (jint)0;
 	}
-	
+
     // Get the output direct buffer
     LOCK_CLASS(env, clazz, "ZlibDecompressor");
-	Bytef *uncompressed_bytes = (*env)->GetDirectBufferAddress(env, 
+	uncompressed_bytes = (*env)->GetDirectBufferAddress(env,
 											uncompressed_direct_buf);
     UNLOCK_CLASS(env, clazz, "ZlibDecompressor");
 
 	if (!uncompressed_bytes) {
 	    return (jint)0;
 	}
-	
+
 	// Re-calibrate the z_stream
 	stream->next_in  = compressed_bytes + compressed_direct_buf_off;
 	stream->next_out = uncompressed_bytes;
 	stream->avail_in  = compressed_direct_buf_len;
 	stream->avail_out = uncompressed_direct_buf_len;
-	
+
 	// Decompress
-	int rv = dlsym_inflate(stream, Z_PARTIAL_FLUSH);
+	rv = dlsym_inflate(stream, Z_PARTIAL_FLUSH);
 
 	// Contingency? - Report error by throwing appropriate exceptions
-	int no_decompressed_bytes = 0;	
 	switch (rv) {
 		case Z_STREAM_END:
 		{
@@ -216,9 +270,9 @@ Java_org_apache_hadoop_io_compress_zlib_
 		case Z_OK:
 		{
 		    compressed_direct_buf_off += compressed_direct_buf_len - stream->avail_in;
-		    (*env)->SetIntField(env, this, ZlibDecompressor_compressedDirectBufOff, 
+		    (*env)->SetIntField(env, this, ZlibDecompressor_compressedDirectBufOff,
 		    			compressed_direct_buf_off);
-		    (*env)->SetIntField(env, this, ZlibDecompressor_compressedDirectBufLen, 
+		    (*env)->SetIntField(env, this, ZlibDecompressor_compressedDirectBufLen,
 		    			stream->avail_in);
 		    no_decompressed_bytes = uncompressed_direct_buf_len - stream->avail_out;
 		}
@@ -227,9 +281,9 @@ Java_org_apache_hadoop_io_compress_zlib_
 		{
 		    (*env)->SetBooleanField(env, this, ZlibDecompressor_needDict, JNI_TRUE);
 		    compressed_direct_buf_off += compressed_direct_buf_len - stream->avail_in;
-		    (*env)->SetIntField(env, this, ZlibDecompressor_compressedDirectBufOff, 
+		    (*env)->SetIntField(env, this, ZlibDecompressor_compressedDirectBufOff,
 		    			compressed_direct_buf_off);
-		    (*env)->SetIntField(env, this, ZlibDecompressor_compressedDirectBufLen, 
+		    (*env)->SetIntField(env, this, ZlibDecompressor_compressedDirectBufLen,
 		    			stream->avail_in);
 		}
 		break;
@@ -251,7 +305,7 @@ Java_org_apache_hadoop_io_compress_zlib_
 		}
 		break;
     }
-    
+
     return no_decompressed_bytes;
 }
 
@@ -299,4 +353,3 @@ Java_org_apache_hadoop_io_compress_zlib_
 /**
  * vim: sw=2: ts=2: et:
  */
-

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zlib/org_apache_hadoop_io_compress_zlib.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zlib/org_apache_hadoop_io_compress_zlib.h?rev=1485845&r1=1485844&r2=1485845&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zlib/org_apache_hadoop_io_compress_zlib.h (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zlib/org_apache_hadoop_io_compress_zlib.h Thu May 23 20:41:53 2013
@@ -19,14 +19,23 @@
 #if !defined ORG_APACHE_HADOOP_IO_COMPRESS_ZLIB_ZLIB_H
 #define ORG_APACHE_HADOOP_IO_COMPRESS_ZLIB_ZLIB_H
 
-#include <dlfcn.h>
-#include <jni.h>
+#include "org_apache_hadoop.h"
+
+#ifdef UNIX
+#include <config.h>
 #include <stddef.h>
-#include <zconf.h>
 #include <zlib.h>
+#include <zconf.h>
+#include <dlfcn.h>
+#include <jni.h>
+#endif
 
-#include "config.h"
-#include "org_apache_hadoop.h"
+#ifdef WINDOWS
+#include <jni.h>
+#define HADOOP_ZLIB_LIBRARY L"zlib1.dll"
+#include <zlib.h>
+#include <zconf.h>
+#endif
 
 /* A helper macro to convert the java 'stream-handle' to a z_stream pointer. */
 #define ZSTREAM(stream) ((z_stream*)((ptrdiff_t)(stream)))



Mime
View raw message