hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1463203 [3/7] - in /hadoop/common/branches/HDFS-347/hadoop-common-project: hadoop-annotations/ hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/ hadoop-common/ hadoop-common/dev-support/ hadoop-common/src/ hadoop-co...
Date Mon, 01 Apr 2013 16:47:27 GMT
Modified: hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java Mon Apr  1 16:47:16 2013
@@ -23,108 +23,156 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.io.compress.bzip2.BZip2Constants;
-import org.apache.hadoop.io.compress.bzip2.BZip2DummyCompressor;
-import org.apache.hadoop.io.compress.bzip2.BZip2DummyDecompressor;
 import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream;
 import org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream;
+import org.apache.hadoop.io.compress.bzip2.Bzip2Factory;
 
 /**
- * This class provides CompressionOutputStream and CompressionInputStream for
- * compression and decompression. Currently we dont have an implementation of
- * the Compressor and Decompressor interfaces, so those methods of
- * CompressionCodec which have a Compressor or Decompressor type argument, throw
- * UnsupportedOperationException.
+ * This class provides output and input streams for bzip2 compression
+ * and decompression.  It uses the native bzip2 library on the system
+ * if possible, else it uses a pure-Java implementation of the bzip2
+ * algorithm.  The configuration parameter
+ * io.compression.codec.bzip2.library can be used to control this
+ * behavior.
+ *
+ * In the pure-Java mode, the Compressor and Decompressor interfaces
+ * are not implemented.  Therefore, in that mode, those methods of
+ * CompressionCodec which have a Compressor or Decompressor type
+ * argument, throw UnsupportedOperationException.
+ *
+ * Currently, support for splittability is available only in the
+ * pure-Java mode; therefore, if a SplitCompressionInputStream is
+ * requested, the pure-Java implementation is used, regardless of the
+ * setting of the configuration parameter mentioned above.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public class BZip2Codec implements SplittableCompressionCodec {
+public class BZip2Codec implements Configurable, SplittableCompressionCodec {
 
   private static final String HEADER = "BZ";
   private static final int HEADER_LEN = HEADER.length();
   private static final String SUB_HEADER = "h9";
   private static final int SUB_HEADER_LEN = SUB_HEADER.length();
 
+  private Configuration conf;
+  
   /**
-  * Creates a new instance of BZip2Codec
+   * Set the configuration to be used by this object.
+   *
+   * @param conf the configuration object.
+   */
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+  
+  /**
+   * Return the configuration used by this object.
+   *
+   * @return the configuration object used by this objec.
+   */
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+  
+  /**
+  * Creates a new instance of BZip2Codec.
   */
   public BZip2Codec() { }
 
   /**
-  * Creates CompressionOutputStream for BZip2
-  *
-  * @param out
-  *            The output Stream
-  * @return The BZip2 CompressionOutputStream
-  * @throws java.io.IOException
-  *             Throws IO exception
-  */
+   * Create a {@link CompressionOutputStream} that will write to the given
+   * {@link OutputStream}.
+   *
+   * @param out        the location for the final output stream
+   * @return a stream the user can write uncompressed data to, to have it 
+   *         compressed
+   * @throws IOException
+   */
   @Override
   public CompressionOutputStream createOutputStream(OutputStream out)
       throws IOException {
-    return new BZip2CompressionOutputStream(out);
+    return createOutputStream(out, createCompressor());
   }
 
   /**
-  * Creates a compressor using given OutputStream.
+   * Create a {@link CompressionOutputStream} that will write to the given
+   * {@link OutputStream} with the given {@link Compressor}.
    *
-  * @return CompressionOutputStream
-    @throws java.io.IOException
+   * @param out        the location for the final output stream
+   * @param compressor compressor to use
+   * @return a stream the user can write uncompressed data to, to have it 
+   *         compressed
+   * @throws IOException
    */
   @Override
   public CompressionOutputStream createOutputStream(OutputStream out,
       Compressor compressor) throws IOException {
-    return createOutputStream(out);
+    return Bzip2Factory.isNativeBzip2Loaded(conf) ?
+      new CompressorStream(out, compressor, 
+                           conf.getInt("io.file.buffer.size", 4*1024)) :
+      new BZip2CompressionOutputStream(out);
   }
 
   /**
-  * This functionality is currently not supported.
-  *
-  * @return BZip2DummyCompressor.class
-  */
+   * Get the type of {@link Compressor} needed by this {@link CompressionCodec}.
+   *
+   * @return the type of compressor needed by this codec.
+   */
   @Override
-  public Class<? extends org.apache.hadoop.io.compress.Compressor> getCompressorType() {
-    return BZip2DummyCompressor.class;
+  public Class<? extends Compressor> getCompressorType() {
+    return Bzip2Factory.getBzip2CompressorType(conf);
   }
 
   /**
-  * This functionality is currently not supported.
-  *
-  * @return Compressor
-  */
+   * Create a new {@link Compressor} for use by this {@link CompressionCodec}.
+   *
+   * @return a new compressor for use by this codec
+   */
   @Override
   public Compressor createCompressor() {
-    return new BZip2DummyCompressor();
+    return Bzip2Factory.getBzip2Compressor(conf);
   }
 
   /**
-  * Creates CompressionInputStream to be used to read off uncompressed data.
-  *
-  * @param in
-  *            The InputStream
-  * @return Returns CompressionInputStream for BZip2
-  * @throws java.io.IOException
-  *             Throws IOException
-  */
+   * Create a {@link CompressionInputStream} that will read from the given
+   * input stream and return a stream for uncompressed data.
+   *
+   * @param in the stream to read compressed bytes from
+   * @return a stream to read uncompressed bytes from
+   * @throws IOException
+   */
   @Override
   public CompressionInputStream createInputStream(InputStream in)
       throws IOException {
-    return new BZip2CompressionInputStream(in);
+    return createInputStream(in, createDecompressor());
   }
 
   /**
-  * This functionality is currently not supported.
-  *
-  * @return CompressionInputStream
-  */
+   * Create a {@link CompressionInputStream} that will read from the given
+   * {@link InputStream} with the given {@link Decompressor}, and return a 
+   * stream for uncompressed data.
+   *
+   * @param in           the stream to read compressed bytes from
+   * @param decompressor decompressor to use
+   * @return a stream to read uncompressed bytes from
+   * @throws IOException
+   */
   @Override
   public CompressionInputStream createInputStream(InputStream in,
       Decompressor decompressor) throws IOException {
-    return createInputStream(in);
+    return Bzip2Factory.isNativeBzip2Loaded(conf) ? 
+      new DecompressorStream(in, decompressor,
+                             conf.getInt("io.file.buffer.size", 4*1024)) :
+      new BZip2CompressionInputStream(in);
   }
 
   /**
@@ -139,7 +187,6 @@ public class BZip2Codec implements Split
    *
    * @return CompressionInputStream for BZip2 aligned at block boundaries
    */
-  @Override
   public SplitCompressionInputStream createInputStream(InputStream seekableIn,
       Decompressor decompressor, long start, long end, READ_MODE readMode)
       throws IOException {
@@ -184,23 +231,23 @@ public class BZip2Codec implements Split
   }
 
   /**
-  * This functionality is currently not supported.
-  *
-  * @return BZip2DummyDecompressor.class
-  */
+   * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}.
+   *
+   * @return the type of decompressor needed by this codec.
+   */
   @Override
-  public Class<? extends org.apache.hadoop.io.compress.Decompressor> getDecompressorType() {
-    return BZip2DummyDecompressor.class;
+  public Class<? extends Decompressor> getDecompressorType() {
+    return Bzip2Factory.getBzip2DecompressorType(conf);
   }
 
   /**
-  * This functionality is currently not supported.
-  *
-  * @return Decompressor
-  */
+   * Create a new {@link Decompressor} for use by this {@link CompressionCodec}.
+   *
+   * @return a new decompressor for use by this codec
+   */
   @Override
   public Decompressor createDecompressor() {
-    return new BZip2DummyDecompressor();
+    return Bzip2Factory.getBzip2Decompressor(conf);
   }
 
   /**
@@ -236,7 +283,6 @@ public class BZip2Codec implements Split
       }
     }
 
-    @Override
     public void finish() throws IOException {
       if (needsReset) {
         // In the case that nothing is written to this stream, we still need to
@@ -256,14 +302,12 @@ public class BZip2Codec implements Split
       }
     }    
     
-    @Override
     public void resetState() throws IOException {
       // Cannot write to out at this point because out might not be ready
       // yet, as in SequenceFile.Writer implementation.
       needsReset = true;
     }
 
-    @Override
     public void write(int b) throws IOException {
       if (needsReset) {
         internalReset();
@@ -271,7 +315,6 @@ public class BZip2Codec implements Split
       this.output.write(b);
     }
 
-    @Override
     public void write(byte[] b, int off, int len) throws IOException {
       if (needsReset) {
         internalReset();
@@ -279,7 +322,6 @@ public class BZip2Codec implements Split
       this.output.write(b, off, len);
     }
 
-    @Override
     public void close() throws IOException {
       if (needsReset) {
         // In the case that nothing is written to this stream, we still need to
@@ -397,7 +439,6 @@ public class BZip2Codec implements Split
 
     }// end of method
 
-    @Override
     public void close() throws IOException {
       if (!needsReset) {
         input.close();
@@ -433,7 +474,6 @@ public class BZip2Codec implements Split
     *
     */
 
-    @Override
     public int read(byte[] b, int off, int len) throws IOException {
       if (needsReset) {
         internalReset();
@@ -457,7 +497,6 @@ public class BZip2Codec implements Split
 
     }
 
-    @Override
     public int read() throws IOException {
       byte b[] = new byte[1];
       int result = this.read(b, 0, 1);
@@ -472,7 +511,6 @@ public class BZip2Codec implements Split
       }
     }    
     
-    @Override
     public void resetState() throws IOException {
       // Cannot read from bufferedIn at this point because bufferedIn
       // might not be ready
@@ -480,7 +518,6 @@ public class BZip2Codec implements Split
       needsReset = true;
     }
 
-    @Override
     public long getPos() {
       return this.compressedStreamPosition;
       }

Modified: hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java Mon Apr  1 16:47:16 2013
@@ -19,7 +19,10 @@ package org.apache.hadoop.io.nativeio;
 
 import java.io.File;
 import java.io.FileDescriptor;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -27,10 +30,13 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.io.SecureIOUtils.AlreadyExistsException;
 import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.hadoop.util.Shell;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
 /**
  * JNI wrappers for various native IO-related calls not available in Java.
  * These functions should generally be used alongside a fallback to another
@@ -39,81 +45,341 @@ import org.apache.commons.logging.LogFac
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class NativeIO {
-  // Flags for open() call from bits/fcntl.h
-  public static final int O_RDONLY   =    00;
-  public static final int O_WRONLY   =    01;
-  public static final int O_RDWR     =    02;
-  public static final int O_CREAT    =  0100;
-  public static final int O_EXCL     =  0200;
-  public static final int O_NOCTTY   =  0400;
-  public static final int O_TRUNC    = 01000;
-  public static final int O_APPEND   = 02000;
-  public static final int O_NONBLOCK = 04000;
-  public static final int O_SYNC   =  010000;
-  public static final int O_ASYNC  =  020000;
-  public static final int O_FSYNC = O_SYNC;
-  public static final int O_NDELAY = O_NONBLOCK;
-
-  // Flags for posix_fadvise() from bits/fcntl.h
-  /* No further special treatment.  */
-  public static final int POSIX_FADV_NORMAL = 0; 
-  /* Expect random page references.  */
-  public static final int POSIX_FADV_RANDOM = 1; 
-  /* Expect sequential page references.  */
-  public static final int POSIX_FADV_SEQUENTIAL = 2; 
-  /* Will need these pages.  */
-  public static final int POSIX_FADV_WILLNEED = 3; 
-  /* Don't need these pages.  */
-  public static final int POSIX_FADV_DONTNEED = 4; 
-  /* Data will be accessed once.  */
-  public static final int POSIX_FADV_NOREUSE = 5; 
-
-
-  /* Wait upon writeout of all pages
-     in the range before performing the
-     write.  */
-  public static final int SYNC_FILE_RANGE_WAIT_BEFORE = 1;
-  /* Initiate writeout of all those
-     dirty pages in the range which are
-     not presently under writeback.  */
-  public static final int SYNC_FILE_RANGE_WRITE = 2;
-
-  /* Wait upon writeout of all pages in
-     the range after performing the
-     write.  */
-  public static final int SYNC_FILE_RANGE_WAIT_AFTER = 4;
+  public static class POSIX {
+    // Flags for open() call from bits/fcntl.h
+    public static final int O_RDONLY   =    00;
+    public static final int O_WRONLY   =    01;
+    public static final int O_RDWR     =    02;
+    public static final int O_CREAT    =  0100;
+    public static final int O_EXCL     =  0200;
+    public static final int O_NOCTTY   =  0400;
+    public static final int O_TRUNC    = 01000;
+    public static final int O_APPEND   = 02000;
+    public static final int O_NONBLOCK = 04000;
+    public static final int O_SYNC   =  010000;
+    public static final int O_ASYNC  =  020000;
+    public static final int O_FSYNC = O_SYNC;
+    public static final int O_NDELAY = O_NONBLOCK;
+
+    // Flags for posix_fadvise() from bits/fcntl.h
+    /* No further special treatment.  */
+    public static final int POSIX_FADV_NORMAL = 0;
+    /* Expect random page references.  */
+    public static final int POSIX_FADV_RANDOM = 1;
+    /* Expect sequential page references.  */
+    public static final int POSIX_FADV_SEQUENTIAL = 2;
+    /* Will need these pages.  */
+    public static final int POSIX_FADV_WILLNEED = 3;
+    /* Don't need these pages.  */
+    public static final int POSIX_FADV_DONTNEED = 4;
+    /* Data will be accessed once.  */
+    public static final int POSIX_FADV_NOREUSE = 5;
+
+
+    /* Wait upon writeout of all pages
+       in the range before performing the
+       write.  */
+    public static final int SYNC_FILE_RANGE_WAIT_BEFORE = 1;
+    /* Initiate writeout of all those
+       dirty pages in the range which are
+       not presently under writeback.  */
+    public static final int SYNC_FILE_RANGE_WRITE = 2;
+
+    /* Wait upon writeout of all pages in
+       the range after performing the
+       write.  */
+    public static final int SYNC_FILE_RANGE_WAIT_AFTER = 4;
+
+    private static final Log LOG = LogFactory.getLog(NativeIO.class);
+
+    private static boolean nativeLoaded = false;
+    private static boolean fadvisePossible = true;
+    private static boolean syncFileRangePossible = true;
+
+    static final String WORKAROUND_NON_THREADSAFE_CALLS_KEY =
+      "hadoop.workaround.non.threadsafe.getpwuid";
+    static final boolean WORKAROUND_NON_THREADSAFE_CALLS_DEFAULT = false;
+
+    private static long cacheTimeout = -1;
+
+    static {
+      if (NativeCodeLoader.isNativeCodeLoaded()) {
+        try {
+          Configuration conf = new Configuration();
+          workaroundNonThreadSafePasswdCalls = conf.getBoolean(
+            WORKAROUND_NON_THREADSAFE_CALLS_KEY,
+            WORKAROUND_NON_THREADSAFE_CALLS_DEFAULT);
+
+          initNative();
+          nativeLoaded = true;
+
+          cacheTimeout = conf.getLong(
+            CommonConfigurationKeys.HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_KEY,
+            CommonConfigurationKeys.HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_DEFAULT) *
+            1000;
+          LOG.debug("Initialized cache for IDs to User/Group mapping with a " +
+            " cache timeout of " + cacheTimeout/1000 + " seconds.");
+
+        } catch (Throwable t) {
+          // This can happen if the user has an older version of libhadoop.so
+          // installed - in this case we can continue without native IO
+          // after warning
+          LOG.error("Unable to initialize NativeIO libraries", t);
+        }
+      }
+    }
 
-  private static final Log LOG = LogFactory.getLog(NativeIO.class);
+    /**
+     * Return true if the JNI-based native IO extensions are available.
+     */
+    public static boolean isAvailable() {
+      return NativeCodeLoader.isNativeCodeLoaded() && nativeLoaded;
+    }
+
+    /** Wrapper around open(2) */
+    public static native FileDescriptor open(String path, int flags, int mode) throws IOException;
+    /** Wrapper around fstat(2) */
+    private static native Stat fstat(FileDescriptor fd) throws IOException;
+
+    /** Native chmod implementation. On UNIX, it is a wrapper around chmod(2) */
+    private static native void chmodImpl(String path, int mode) throws IOException;
+
+    public static void chmod(String path, int mode) throws IOException {
+      if (!Shell.WINDOWS) {
+        chmodImpl(path, mode);
+      } else {
+        try {
+          chmodImpl(path, mode);
+        } catch (NativeIOException nioe) {
+          if (nioe.getErrorCode() == 3) {
+            throw new NativeIOException("No such file or directory",
+                Errno.ENOENT);
+          } else {
+            LOG.warn(String.format("NativeIO.chmod error (%d): %s",
+                nioe.getErrorCode(), nioe.getMessage()));
+            throw new NativeIOException("Unknown error", Errno.UNKNOWN);
+          }
+        }
+      }
+    }
+
+    /** Wrapper around posix_fadvise(2) */
+    static native void posix_fadvise(
+      FileDescriptor fd, long offset, long len, int flags) throws NativeIOException;
+
+    /** Wrapper around sync_file_range(2) */
+    static native void sync_file_range(
+      FileDescriptor fd, long offset, long nbytes, int flags) throws NativeIOException;
+
+    /**
+     * Call posix_fadvise on the given file descriptor. See the manpage
+     * for this syscall for more information. On systems where this
+     * call is not available, does nothing.
+     *
+     * @throws NativeIOException if there is an error with the syscall
+     */
+    public static void posixFadviseIfPossible(
+        FileDescriptor fd, long offset, long len, int flags)
+        throws NativeIOException {
+      if (nativeLoaded && fadvisePossible) {
+        try {
+          posix_fadvise(fd, offset, len, flags);
+        } catch (UnsupportedOperationException uoe) {
+          fadvisePossible = false;
+        } catch (UnsatisfiedLinkError ule) {
+          fadvisePossible = false;
+        }
+      }
+    }
+
+    /**
+     * Call sync_file_range on the given file descriptor. See the manpage
+     * for this syscall for more information. On systems where this
+     * call is not available, does nothing.
+     *
+     * @throws NativeIOException if there is an error with the syscall
+     */
+    public static void syncFileRangeIfPossible(
+        FileDescriptor fd, long offset, long nbytes, int flags)
+        throws NativeIOException {
+      if (nativeLoaded && syncFileRangePossible) {
+        try {
+          sync_file_range(fd, offset, nbytes, flags);
+        } catch (UnsupportedOperationException uoe) {
+          syncFileRangePossible = false;
+        } catch (UnsatisfiedLinkError ule) {
+          syncFileRangePossible = false;
+        }
+      }
+    }
+
+    /** Linux only methods used for getOwner() implementation */
+    private static native long getUIDforFDOwnerforOwner(FileDescriptor fd) throws IOException;
+    private static native String getUserName(long uid) throws IOException;
+
+    /**
+     * Result type of the fstat call
+     */
+    public static class Stat {
+      private int ownerId, groupId;
+      private String owner, group;
+      private int mode;
+
+      // Mode constants
+      public static final int S_IFMT = 0170000;      /* type of file */
+      public static final int   S_IFIFO  = 0010000;  /* named pipe (fifo) */
+      public static final int   S_IFCHR  = 0020000;  /* character special */
+      public static final int   S_IFDIR  = 0040000;  /* directory */
+      public static final int   S_IFBLK  = 0060000;  /* block special */
+      public static final int   S_IFREG  = 0100000;  /* regular */
+      public static final int   S_IFLNK  = 0120000;  /* symbolic link */
+      public static final int   S_IFSOCK = 0140000;  /* socket */
+      public static final int   S_IFWHT  = 0160000;  /* whiteout */
+      public static final int S_ISUID = 0004000;  /* set user id on execution */
+      public static final int S_ISGID = 0002000;  /* set group id on execution */
+      public static final int S_ISVTX = 0001000;  /* save swapped text even after use */
+      public static final int S_IRUSR = 0000400;  /* read permission, owner */
+      public static final int S_IWUSR = 0000200;  /* write permission, owner */
+      public static final int S_IXUSR = 0000100;  /* execute/search permission, owner */
+
+      Stat(int ownerId, int groupId, int mode) {
+        this.ownerId = ownerId;
+        this.groupId = groupId;
+        this.mode = mode;
+      }
+
+      @Override
+      public String toString() {
+        return "Stat(owner='" + owner + "', group='" + group + "'" +
+          ", mode=" + mode + ")";
+      }
+
+      public String getOwner() {
+        return owner;
+      }
+      public String getGroup() {
+        return group;
+      }
+      public int getMode() {
+        return mode;
+      }
+    }
+
+    /**
+     * Returns the file stat for a file descriptor.
+     *
+     * @param fd file descriptor.
+     * @return the file descriptor file stat.
+     * @throws IOException thrown if there was an IO error while obtaining the file stat.
+     */
+    public static Stat getFstat(FileDescriptor fd) throws IOException {
+      Stat stat = fstat(fd);
+      stat.owner = getName(IdCache.USER, stat.ownerId);
+      stat.group = getName(IdCache.GROUP, stat.groupId);
+      return stat;
+    }
+
+    private static String getName(IdCache domain, int id) throws IOException {
+      Map<Integer, CachedName> idNameCache = (domain == IdCache.USER)
+        ? USER_ID_NAME_CACHE : GROUP_ID_NAME_CACHE;
+      String name;
+      CachedName cachedName = idNameCache.get(id);
+      long now = System.currentTimeMillis();
+      if (cachedName != null && (cachedName.timestamp + cacheTimeout) > now) {
+        name = cachedName.name;
+      } else {
+        name = (domain == IdCache.USER) ? getUserName(id) : getGroupName(id);
+        if (LOG.isDebugEnabled()) {
+          String type = (domain == IdCache.USER) ? "UserName" : "GroupName";
+          LOG.debug("Got " + type + " " + name + " for ID " + id +
+            " from the native implementation");
+        }
+        cachedName = new CachedName(name, now);
+        idNameCache.put(id, cachedName);
+      }
+      return name;
+    }
+
+    static native String getUserName(int uid) throws IOException;
+    static native String getGroupName(int uid) throws IOException;
+
+    private static class CachedName {
+      final long timestamp;
+      final String name;
+
+      public CachedName(String name, long timestamp) {
+        this.name = name;
+        this.timestamp = timestamp;
+      }
+    }
+
+    private static final Map<Integer, CachedName> USER_ID_NAME_CACHE =
+      new ConcurrentHashMap<Integer, CachedName>();
+
+    private static final Map<Integer, CachedName> GROUP_ID_NAME_CACHE =
+      new ConcurrentHashMap<Integer, CachedName>();
+
+    private enum IdCache { USER, GROUP }
+  }
 
-  private static boolean nativeLoaded = false;
   private static boolean workaroundNonThreadSafePasswdCalls = false;
-  private static boolean fadvisePossible = true;
-  private static boolean syncFileRangePossible = true;
 
-  static final String WORKAROUND_NON_THREADSAFE_CALLS_KEY =
-    "hadoop.workaround.non.threadsafe.getpwuid";
-  static final boolean WORKAROUND_NON_THREADSAFE_CALLS_DEFAULT = false;
 
-  private static long cacheTimeout = -1;
+  public static class Windows {
+    // Flags for CreateFile() call on Windows
+    public static final long GENERIC_READ = 0x80000000L;
+    public static final long GENERIC_WRITE = 0x40000000L;
+
+    public static final long FILE_SHARE_READ = 0x00000001L;
+    public static final long FILE_SHARE_WRITE = 0x00000002L;
+    public static final long FILE_SHARE_DELETE = 0x00000004L;
+
+    public static final long CREATE_NEW = 1;
+    public static final long CREATE_ALWAYS = 2;
+    public static final long OPEN_EXISTING = 3;
+    public static final long OPEN_ALWAYS = 4;
+    public static final long TRUNCATE_EXISTING = 5;
+
+    public static final long FILE_BEGIN = 0;
+    public static final long FILE_CURRENT = 1;
+    public static final long FILE_END = 2;
+
+    /** Wrapper around CreateFile() on Windows */
+    public static native FileDescriptor createFile(String path,
+        long desiredAccess, long shareMode, long creationDisposition)
+        throws IOException;
+
+    /** Wrapper around SetFilePointer() on Windows */
+    public static native long setFilePointer(FileDescriptor fd,
+        long distanceToMove, long moveMethod) throws IOException;
+
+    /** Windows only methods used for getOwner() implementation */
+    private static native String getOwner(FileDescriptor fd) throws IOException;
+
+    static {
+      if (NativeCodeLoader.isNativeCodeLoaded()) {
+        try {
+          initNative();
+          nativeLoaded = true;
+        } catch (Throwable t) {
+          // This can happen if the user has an older version of libhadoop.so
+          // installed - in this case we can continue without native IO
+          // after warning
+          LOG.error("Unable to initialize NativeIO libraries", t);
+        }
+      }
+    }
+  }
+
+  private static final Log LOG = LogFactory.getLog(NativeIO.class);
+
+  private static boolean nativeLoaded = false;
 
   static {
     if (NativeCodeLoader.isNativeCodeLoaded()) {
       try {
-        Configuration conf = new Configuration();
-        workaroundNonThreadSafePasswdCalls = conf.getBoolean(
-          WORKAROUND_NON_THREADSAFE_CALLS_KEY,
-          WORKAROUND_NON_THREADSAFE_CALLS_DEFAULT);
-
         initNative();
         nativeLoaded = true;
-
-        cacheTimeout = conf.getLong(
-          CommonConfigurationKeys.HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_KEY,
-          CommonConfigurationKeys.HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_DEFAULT) *
-          1000;
-        LOG.debug("Initialized cache for IDs to User/Group mapping with a" +
-          " cache timeout of " + cacheTimeout/1000 + " seconds.");
-
       } catch (Throwable t) {
         // This can happen if the user has an older version of libhadoop.so
         // installed - in this case we can continue without native IO
@@ -130,169 +396,161 @@ public class NativeIO {
     return NativeCodeLoader.isNativeCodeLoaded() && nativeLoaded;
   }
 
-  /** Wrapper around open(2) */
-  public static native FileDescriptor open(String path, int flags, int mode) throws IOException;
-  /** Wrapper around fstat(2) */
-  private static native Stat fstat(FileDescriptor fd) throws IOException;
-  /** Wrapper around chmod(2) */
-  public static native void chmod(String path, int mode) throws IOException;
-
-  /** Wrapper around posix_fadvise(2) */
-  static native void posix_fadvise(
-    FileDescriptor fd, long offset, long len, int flags) throws NativeIOException;
-
-  /** Wrapper around sync_file_range(2) */
-  static native void sync_file_range(
-    FileDescriptor fd, long offset, long nbytes, int flags) throws NativeIOException;
-
   /** Initialize the JNI method ID and class ID cache */
   private static native void initNative();
 
-  /**
-   * Call posix_fadvise on the given file descriptor. See the manpage
-   * for this syscall for more information. On systems where this
-   * call is not available, does nothing.
-   *
-   * @throws NativeIOException if there is an error with the syscall
-   */
-  public static void posixFadviseIfPossible(
-      FileDescriptor fd, long offset, long len, int flags)
-      throws NativeIOException {
-    if (nativeLoaded && fadvisePossible) {
-      try {
-        posix_fadvise(fd, offset, len, flags);
-      } catch (UnsupportedOperationException uoe) {
-        fadvisePossible = false;
-      } catch (UnsatisfiedLinkError ule) {
-        fadvisePossible = false;
-      }
+  private static class CachedUid {
+    final long timestamp;
+    final String username;
+    public CachedUid(String username, long timestamp) {
+      this.timestamp = timestamp;
+      this.username = username;
     }
   }
-
-  /**
-   * Call sync_file_range on the given file descriptor. See the manpage
-   * for this syscall for more information. On systems where this
-   * call is not available, does nothing.
-   *
-   * @throws NativeIOException if there is an error with the syscall
-   */
-  public static void syncFileRangeIfPossible(
-      FileDescriptor fd, long offset, long nbytes, int flags)
-      throws NativeIOException {
-    if (nativeLoaded && syncFileRangePossible) {
-      try {
-        sync_file_range(fd, offset, nbytes, flags);
-      } catch (UnsupportedOperationException uoe) {
-        syncFileRangePossible = false;
-      } catch (UnsatisfiedLinkError ule) {
-        syncFileRangePossible = false;
+  private static final Map<Long, CachedUid> uidCache =
+      new ConcurrentHashMap<Long, CachedUid>();
+  private static long cacheTimeout;
+  private static boolean initialized = false;
+
+  public static String getOwner(FileDescriptor fd) throws IOException {
+    ensureInitialized();
+    if (Shell.WINDOWS) {
+      String owner = Windows.getOwner(fd);
+      int i = owner.indexOf('\\');
+      if (i != -1)
+        owner = owner.substring(i + 1);
+      return owner;
+    } else {
+      long uid = POSIX.getUIDforFDOwnerforOwner(fd);
+      CachedUid cUid = uidCache.get(uid);
+      long now = System.currentTimeMillis();
+      if (cUid != null && (cUid.timestamp + cacheTimeout) > now) {
+        return cUid.username;
       }
+      String user = POSIX.getUserName(uid);
+      LOG.info("Got UserName " + user + " for UID " + uid
+          + " from the native implementation");
+      cUid = new CachedUid(user, now);
+      uidCache.put(uid, cUid);
+      return user;
     }
   }
 
   /**
-   * Result type of the fstat call
+   * Create a FileInputStream that shares delete permission on the
+   * file opened, i.e. other process can delete the file the
+   * FileInputStream is reading. Only Windows implementation uses
+   * the native interface.
    */
-  public static class Stat {
-    private int ownerId, groupId;
-    private String owner, group;
-    private int mode;
-
-    // Mode constants
-    public static final int S_IFMT = 0170000;      /* type of file */
-    public static final int   S_IFIFO  = 0010000;  /* named pipe (fifo) */
-    public static final int   S_IFCHR  = 0020000;  /* character special */
-    public static final int   S_IFDIR  = 0040000;  /* directory */
-    public static final int   S_IFBLK  = 0060000;  /* block special */
-    public static final int   S_IFREG  = 0100000;  /* regular */
-    public static final int   S_IFLNK  = 0120000;  /* symbolic link */
-    public static final int   S_IFSOCK = 0140000;  /* socket */
-    public static final int   S_IFWHT  = 0160000;  /* whiteout */
-    public static final int S_ISUID = 0004000;  /* set user id on execution */
-    public static final int S_ISGID = 0002000;  /* set group id on execution */
-    public static final int S_ISVTX = 0001000;  /* save swapped text even after use */
-    public static final int S_IRUSR = 0000400;  /* read permission, owner */
-    public static final int S_IWUSR = 0000200;  /* write permission, owner */
-    public static final int S_IXUSR = 0000100;  /* execute/search permission, owner */
-
-    Stat(int ownerId, int groupId, int mode) {
-      this.ownerId = ownerId;
-      this.groupId = groupId;
-      this.mode = mode;
-    }
-
-    @Override
-    public String toString() {
-      return "Stat(owner='" + owner + "', group='" + group + "'" +
-        ", mode=" + mode + ")";
-    }
-
-    public String getOwner() {
-      return owner;
-    }
-    public String getGroup() {
-      return group;
-    }
-    public int getMode() {
-      return mode;
+  public static FileInputStream getShareDeleteFileInputStream(File f)
+      throws IOException {
+    if (!Shell.WINDOWS) {
+      // On Linux the default FileInputStream shares delete permission
+      // on the file opened.
+      //
+      return new FileInputStream(f);
+    } else {
+      // Use Windows native interface to create a FileInputStream that
+      // shares delete permission on the file opened.
+      //
+      FileDescriptor fd = Windows.createFile(
+          f.getAbsolutePath(),
+          Windows.GENERIC_READ,
+          Windows.FILE_SHARE_READ |
+              Windows.FILE_SHARE_WRITE |
+              Windows.FILE_SHARE_DELETE,
+          Windows.OPEN_EXISTING);
+      return new FileInputStream(fd);
     }
   }
 
-  static native String getUserName(int uid) throws IOException;
-
-  static native String getGroupName(int uid) throws IOException;
-
-  private static class CachedName {
-    final long timestamp;
-    final String name;
-
-    public CachedName(String name, long timestamp) {
-      this.name = name;
-      this.timestamp = timestamp;
+  /**
+   * Create a FileInputStream that shares delete permission on the
+   * file opened at a given offset, i.e. other process can delete
+   * the file the FileInputStream is reading. Only Windows implementation
+   * uses the native interface.
+   */
+  public static FileInputStream getShareDeleteFileInputStream(File f, long seekOffset)
+      throws IOException {
+    if (!Shell.WINDOWS) {
+      RandomAccessFile rf = new RandomAccessFile(f, "r");
+      if (seekOffset > 0) {
+        rf.seek(seekOffset);
+      }
+      return new FileInputStream(rf.getFD());
+    } else {
+      // Use Windows native interface to create a FileInputStream that
+      // shares delete permission on the file opened, and set it to the
+      // given offset.
+      //
+      FileDescriptor fd = NativeIO.Windows.createFile(
+          f.getAbsolutePath(),
+          NativeIO.Windows.GENERIC_READ,
+          NativeIO.Windows.FILE_SHARE_READ |
+              NativeIO.Windows.FILE_SHARE_WRITE |
+              NativeIO.Windows.FILE_SHARE_DELETE,
+          NativeIO.Windows.OPEN_EXISTING);
+      if (seekOffset > 0)
+        NativeIO.Windows.setFilePointer(fd, seekOffset, NativeIO.Windows.FILE_BEGIN);
+      return new FileInputStream(fd);
     }
   }
 
-  private static final Map<Integer, CachedName> USER_ID_NAME_CACHE =
-    new ConcurrentHashMap<Integer, CachedName>();
-
-  private static final Map<Integer, CachedName> GROUP_ID_NAME_CACHE =
-    new ConcurrentHashMap<Integer, CachedName>();
-
-  private enum IdCache { USER, GROUP }
-
-  private static String getName(IdCache domain, int id) throws IOException {
-    Map<Integer, CachedName> idNameCache = (domain == IdCache.USER)
-      ? USER_ID_NAME_CACHE : GROUP_ID_NAME_CACHE;
-    String name;
-    CachedName cachedName = idNameCache.get(id);
-    long now = System.currentTimeMillis();
-    if (cachedName != null && (cachedName.timestamp + cacheTimeout) > now) {
-      name = cachedName.name;
+  /**
+   * Create the specified File for write access, ensuring that it does not exist.
+   * @param f the file that we want to create
+   * @param permissions we want to have on the file (if security is enabled)
+   *
+   * @throws AlreadyExistsException if the file already exists
+   * @throws IOException if any other error occurred
+   */
+  public static FileOutputStream getCreateForWriteFileOutputStream(File f, int permissions)
+      throws IOException {
+    if (!Shell.WINDOWS) {
+      // Use the native wrapper around open(2)
+      try {
+        FileDescriptor fd = NativeIO.POSIX.open(f.getAbsolutePath(),
+            NativeIO.POSIX.O_WRONLY | NativeIO.POSIX.O_CREAT
+                | NativeIO.POSIX.O_EXCL, permissions);
+        return new FileOutputStream(fd);
+      } catch (NativeIOException nioe) {
+        if (nioe.getErrno() == Errno.EEXIST) {
+          throw new AlreadyExistsException(nioe);
+        }
+        throw nioe;
+      }
     } else {
-      name = (domain == IdCache.USER) ? getUserName(id) : getGroupName(id);
-      if (LOG.isDebugEnabled()) {
-        String type = (domain == IdCache.USER) ? "UserName" : "GroupName";
-        LOG.debug("Got " + type + " " + name + " for ID " + id +
-          " from the native implementation");
+      // Use the Windows native APIs to create equivalent FileOutputStream
+      try {
+        FileDescriptor fd = NativeIO.Windows.createFile(f.getCanonicalPath(),
+            NativeIO.Windows.GENERIC_WRITE,
+            NativeIO.Windows.FILE_SHARE_DELETE
+                | NativeIO.Windows.FILE_SHARE_READ
+                | NativeIO.Windows.FILE_SHARE_WRITE,
+            NativeIO.Windows.CREATE_NEW);
+        NativeIO.POSIX.chmod(f.getCanonicalPath(), permissions);
+        return new FileOutputStream(fd);
+      } catch (NativeIOException nioe) {
+        if (nioe.getErrorCode() == 80) {
+          // ERROR_FILE_EXISTS
+          // 80 (0x50)
+          // The file exists
+          throw new AlreadyExistsException(nioe);
+        }
+        throw nioe;
       }
-      cachedName = new CachedName(name, now);
-      idNameCache.put(id, cachedName);
     }
-    return name;
   }
 
-  /**
-   * Returns the file stat for a file descriptor.
-   *
-   * @param fd file descriptor.
-   * @return the file descriptor file stat.
-   * @throws IOException thrown if there was an IO error while obtaining the file stat.
-   */
-  public static Stat getFstat(FileDescriptor fd) throws IOException {
-    Stat stat = fstat(fd);
-    stat.owner = getName(IdCache.USER, stat.ownerId);
-    stat.group = getName(IdCache.GROUP, stat.groupId);
-    return stat;
+  private synchronized static void ensureInitialized() {
+    if (!initialized) {
+      cacheTimeout =
+          new Configuration().getLong("hadoop.security.uid.cache.secs",
+              4*60*60) * 1000;
+      LOG.info("Initialized cache for UID to User mapping with a cache" +
+          " timeout of " + cacheTimeout/1000 + " seconds.");
+      initialized = true;
+    }
   }
   
   /**

Modified: hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIOException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIOException.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIOException.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIOException.java Mon Apr  1 16:47:16 2013
@@ -18,20 +18,40 @@
 package org.apache.hadoop.io.nativeio;
 
 import java.io.IOException;
+import org.apache.hadoop.util.Shell;
+
 
 /**
  * An exception generated by a call to the native IO code.
  *
- * These exceptions simply wrap <i>errno</i> result codes.
+ * These exceptions simply wrap <i>errno</i> result codes on Linux,
+ * or the System Error Code on Windows.
  */
 public class NativeIOException extends IOException {
   private static final long serialVersionUID = 1L;
 
   private Errno errno;
 
+  // Java has no unsigned primitive error code. Use a signed 32-bit
+  // integer to hold the unsigned 32-bit integer.
+  private int errorCode;
+
   public NativeIOException(String msg, Errno errno) {
     super(msg);
     this.errno = errno;
+    // Windows error code is always set to ERROR_SUCCESS on Linux,
+    // i.e. no failure on Windows
+    this.errorCode = 0;
+  }
+
+  public NativeIOException(String msg, int errorCode) {
+    super(msg);
+    this.errorCode = errorCode;
+    this.errno = Errno.UNKNOWN;
+  }
+
+  public long getErrorCode() {
+    return errorCode;
   }
 
   public Errno getErrno() {
@@ -40,8 +60,10 @@ public class NativeIOException extends I
 
   @Override
   public String toString() {
-    return errno.toString() + ": " + super.getMessage();
+    if (Shell.WINDOWS)
+      return errorCode + ": " + super.getMessage();
+    else
+      return errno.toString() + ": " + super.getMessage();
   }
 }
 
-

Modified: hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Mon Apr  1 16:47:16 2013
@@ -59,7 +59,6 @@ import org.apache.hadoop.fs.CommonConfig
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
@@ -84,6 +83,7 @@ import org.apache.hadoop.util.Reflection
 import org.apache.hadoop.util.Time;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.CodedOutputStream;
 
 /** A client for an IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
@@ -243,7 +243,7 @@ public class Client {
       callComplete();
     }
     
-    public synchronized Writable getRpcResult() {
+    public synchronized Writable getRpcResponse() {
       return rpcResponse;
     }
   }
@@ -257,6 +257,7 @@ public class Client {
     private final ConnectionId remoteId;                // connection id
     private AuthMethod authMethod; // authentication method
     private Token<? extends TokenIdentifier> token;
+    private int serviceClass;
     private SaslRpcClient saslRpcClient;
     
     private Socket socket = null;                 // connected socket
@@ -279,7 +280,7 @@ public class Client {
     
     private final Object sendRpcRequestLock = new Object();
 
-    public Connection(ConnectionId remoteId) throws IOException {
+    public Connection(ConnectionId remoteId, int serviceClass) throws IOException {
       this.remoteId = remoteId;
       this.server = remoteId.getAddress();
       if (server.isUnresolved()) {
@@ -296,6 +297,7 @@ public class Client {
       this.tcpNoDelay = remoteId.getTcpNoDelay();
       this.doPing = remoteId.getDoPing();
       this.pingInterval = remoteId.getPingInterval();
+      this.serviceClass = serviceClass;
       if (LOG.isDebugEnabled()) {
         LOG.debug("The ping interval is " + this.pingInterval + " ms.");
       }
@@ -747,7 +749,9 @@ public class Client {
      * +----------------------------------+
      * |  "hrpc" 4 bytes                  |      
      * +----------------------------------+
-     * |  Version (1 bytes)               |      
+     * |  Version (1 byte)                |
+     * +----------------------------------+
+     * |  Service Class (1 byte)          |
      * +----------------------------------+
      * |  Authmethod (1 byte)             |      
      * +----------------------------------+
@@ -760,6 +764,7 @@ public class Client {
       // Write out the header, version and authentication method
       out.write(Server.HEADER.array());
       out.write(Server.CURRENT_VERSION);
+      out.write(serviceClass);
       authMethod.write(out);
       Server.IpcSerializationType.PROTOBUF.write(out);
       out.flush();
@@ -945,31 +950,58 @@ public class Client {
       touch();
       
       try {
-        RpcResponseHeaderProto response = 
+        int totalLen = in.readInt();
+        RpcResponseHeaderProto header = 
             RpcResponseHeaderProto.parseDelimitedFrom(in);
-        if (response == null) {
+        if (header == null) {
           throw new IOException("Response is null.");
         }
+        int headerLen = header.getSerializedSize();
+        headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);
 
-        int callId = response.getCallId();
+        int callId = header.getCallId();
         if (LOG.isDebugEnabled())
           LOG.debug(getName() + " got value #" + callId);
 
         Call call = calls.get(callId);
-        RpcStatusProto status = response.getStatus();
+        RpcStatusProto status = header.getStatus();
         if (status == RpcStatusProto.SUCCESS) {
           Writable value = ReflectionUtils.newInstance(valueClass, conf);
           value.readFields(in);                 // read value
           call.setRpcResponse(value);
           calls.remove(callId);
-        } else if (status == RpcStatusProto.ERROR) {
-          call.setException(new RemoteException(WritableUtils.readString(in),
-                                                WritableUtils.readString(in)));
-          calls.remove(callId);
-        } else if (status == RpcStatusProto.FATAL) {
-          // Close the connection
-          markClosed(new RemoteException(WritableUtils.readString(in), 
-                                         WritableUtils.readString(in)));
+          
+          // verify that length was correct
+          // only for ProtobufEngine where len can be verified easily
+          if (call.getRpcResponse() instanceof ProtobufRpcEngine.RpcWrapper) {
+            ProtobufRpcEngine.RpcWrapper resWrapper = 
+                (ProtobufRpcEngine.RpcWrapper) call.getRpcResponse();
+            if (totalLen != headerLen + resWrapper.getLength()) { 
+              throw new RpcClientException(
+                  "RPC response length mismatch on rpc success");
+            }
+          }
+        } else { // Rpc Request failed
+          // Verify that length was correct
+          if (totalLen != headerLen) {
+            throw new RpcClientException(
+                "RPC response length mismatch on rpc error");
+          }
+          
+          final String exceptionClassName = header.hasExceptionClassName() ?
+                header.getExceptionClassName() : 
+                  "ServerDidNotSetExceptionClassName";
+          final String errorMsg = header.hasErrorMsg() ? 
+                header.getErrorMsg() : "ServerDidNotSetErrorMsg" ;
+          RemoteException re = 
+              new RemoteException(exceptionClassName, errorMsg);
+          if (status == RpcStatusProto.ERROR) {
+            call.setException(re);
+            calls.remove(callId);
+          } else if (status == RpcStatusProto.FATAL) {
+            // Close the connection
+            markClosed(re);
+          }
         }
       } catch (IOException e) {
         markClosed(e);
@@ -1152,20 +1184,34 @@ public class Client {
 
   
   /**
-   * Same as {@link #call(RPC.RpcKind, Writable, InetSocketAddress, 
+   * Same as {@link #call(RPC.RpcKind, Writable, InetSocketAddress,
    * Class, UserGroupInformation, int, Configuration)}
    * except that rpcKind is writable.
    */
-  public Writable call(Writable param, InetSocketAddress addr, 
+  public Writable call(Writable param, InetSocketAddress addr,
       Class<?> protocol, UserGroupInformation ticket,
-      int rpcTimeout, Configuration conf)  
+      int rpcTimeout, Configuration conf)
       throws InterruptedException, IOException {
-        ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
+    ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
         ticket, rpcTimeout, conf);
     return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId);
   }
   
   /**
+   * Same as {@link #call(Writable, InetSocketAddress,
+   * Class, UserGroupInformation, int, Configuration)}
+   * except that specifying serviceClass.
+   */
+  public Writable call(Writable param, InetSocketAddress addr,
+      Class<?> protocol, UserGroupInformation ticket,
+      int rpcTimeout, int serviceClass, Configuration conf)
+      throws InterruptedException, IOException {
+    ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
+        ticket, rpcTimeout, conf);
+    return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId, serviceClass);
+  }
+
+  /**
    * Make a call, passing <code>param</code>, to the IPC server running at
    * <code>address</code> which is servicing the <code>protocol</code> protocol,
    * with the <code>ticket</code> credentials, <code>rpcTimeout</code> as
@@ -1191,6 +1237,22 @@ public class Client {
      return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId);
   }
   
+  /**
+   * Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
+   * <code>remoteId</code>, returning the rpc respond.
+   *
+   * @param rpcKind
+   * @param rpcRequest -  contains serialized method and method parameters
+   * @param remoteId - the target rpc server
+   * @returns the rpc response
+   * Throws exceptions if there are network problems or if the remote code 
+   * threw an exception.
+   */
+  public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
+      ConnectionId remoteId) throws InterruptedException, IOException {
+    return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT);
+  }
+
   /** 
    * Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
    * <code>remoteId</code>, returning the rpc respond.
@@ -1198,14 +1260,16 @@ public class Client {
    * @param rpcKind
    * @param rpcRequest -  contains serialized method and method parameters
    * @param remoteId - the target rpc server
+   * @param serviceClass - service class for RPC
    * @returns the rpc response
    * Throws exceptions if there are network problems or if the remote code 
    * threw an exception.
    */
   public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
-      ConnectionId remoteId) throws InterruptedException, IOException {
+      ConnectionId remoteId, int serviceClass)
+      throws InterruptedException, IOException {
     Call call = new Call(rpcKind, rpcRequest);
-    Connection connection = getConnection(remoteId, call);
+    Connection connection = getConnection(remoteId, call, serviceClass);
     try {
       connection.sendRpcRequest(call);                 // send the rpc request
     } catch (RejectedExecutionException e) {
@@ -1245,7 +1309,7 @@ public class Client {
                   call.error);
         }
       } else {
-        return call.getRpcResult();
+        return call.getRpcResponse();
       }
     }
   }
@@ -1262,7 +1326,7 @@ public class Client {
   /** Get a connection from the pool, or create a new one and add it to the
    * pool.  Connections to a given ConnectionId are reused. */
   private Connection getConnection(ConnectionId remoteId,
-                                   Call call)
+                                   Call call, int serviceClass)
                                    throws IOException, InterruptedException {
     if (!running.get()) {
       // the client is stopped
@@ -1277,7 +1341,7 @@ public class Client {
       synchronized (connections) {
         connection = connections.get(remoteId);
         if (connection == null) {
-          connection = new Connection(remoteId);
+          connection = new Connection(remoteId, serviceClass);
           connections.put(remoteId, connection);
         }
       }

Modified: hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java Mon Apr  1 16:47:16 2013
@@ -21,6 +21,7 @@ package org.apache.hadoop.ipc;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.net.InetSocketAddress;
@@ -39,7 +40,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ipc.Client.ConnectionId;
 import org.apache.hadoop.ipc.RPC.RpcInvoker;
-import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestProto;
+import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestHeaderProto;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -47,7 +48,9 @@ import org.apache.hadoop.util.ProtoUtil;
 import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.AbstractMessageLite;
 import com.google.protobuf.BlockingService;
+import com.google.protobuf.CodedOutputStream;
 import com.google.protobuf.Descriptors.MethodDescriptor;
 import com.google.protobuf.Message;
 import com.google.protobuf.ServiceException;
@@ -128,25 +131,12 @@ public class ProtobufRpcEngine implement
           .getProtocolVersion(protocol);
     }
 
-    private RequestProto constructRpcRequest(Method method,
-        Object[] params) throws ServiceException {
-      RequestProto rpcRequest;
-      RequestProto.Builder builder = RequestProto
+    private RequestHeaderProto constructRpcRequestHeader(Method method) {
+      RequestHeaderProto.Builder builder = RequestHeaderProto
           .newBuilder();
       builder.setMethodName(method.getName());
+     
 
-      if (params.length != 2) { // RpcController + Message
-        throw new ServiceException("Too many parameters for request. Method: ["
-            + method.getName() + "]" + ", Expected: 2, Actual: "
-            + params.length);
-      }
-      if (params[1] == null) {
-        throw new ServiceException("null param while calling Method: ["
-            + method.getName() + "]");
-      }
-
-      Message param = (Message) params[1];
-      builder.setRequest(param.toByteString());
       // For protobuf, {@code protocol} used when creating client side proxy is
       // the interface extending BlockingInterface, which has the annotations 
       // such as ProtocolName etc.
@@ -160,8 +150,7 @@ public class ProtobufRpcEngine implement
       // For PB this may limit the use of mixins on client side.
       builder.setDeclaringClassProtocolName(protocolName);
       builder.setClientProtocolVersion(clientProtocolVersion);
-      rpcRequest = builder.build();
-      return rpcRequest;
+      return builder.build();
     }
 
     /**
@@ -189,8 +178,18 @@ public class ProtobufRpcEngine implement
       if (LOG.isDebugEnabled()) {
         startTime = Time.now();
       }
+      
+      if (args.length != 2) { // RpcController + Message
+        throw new ServiceException("Too many parameters for request. Method: ["
+            + method.getName() + "]" + ", Expected: 2, Actual: "
+            + args.length);
+      }
+      if (args[1] == null) {
+        throw new ServiceException("null param while calling Method: ["
+            + method.getName() + "]");
+      }
 
-      RequestProto rpcRequest = constructRpcRequest(method, args);
+      RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
       RpcResponseWrapper val = null;
       
       if (LOG.isTraceEnabled()) {
@@ -198,9 +197,12 @@ public class ProtobufRpcEngine implement
             remoteId + ": " + method.getName() +
             " {" + TextFormat.shortDebugString((Message) args[1]) + "}");
       }
+
+
+      Message theRequest = (Message) args[1];
       try {
         val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
-            new RpcRequestWrapper(rpcRequest), remoteId);
+            new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId);
 
       } catch (Throwable e) {
         if (LOG.isTraceEnabled()) {
@@ -226,7 +228,7 @@ public class ProtobufRpcEngine implement
       Message returnMessage;
       try {
         returnMessage = prototype.newBuilderForType()
-            .mergeFrom(val.responseMessage).build();
+            .mergeFrom(val.theResponseRead).build();
 
         if (LOG.isTraceEnabled()) {
           LOG.trace(Thread.currentThread().getId() + ": Response <- " +
@@ -267,6 +269,9 @@ public class ProtobufRpcEngine implement
     }
   }
 
+  interface RpcWrapper extends Writable {
+    int getLength();
+  }
   /**
    * Wrapper for Protocol Buffer Requests
    * 
@@ -274,21 +279,26 @@ public class ProtobufRpcEngine implement
    * Protobuf. Several methods on {@link org.apache.hadoop.ipc.Server and RPC} 
    * use type Writable as a wrapper to work across multiple RpcEngine kinds.
    */
-  private static class RpcRequestWrapper implements Writable {
-    RequestProto message;
+  private static class RpcRequestWrapper implements RpcWrapper {
+    RequestHeaderProto requestHeader;
+    Message theRequest; // for clientSide, the request is here
+    byte[] theRequestRead; // for server side, the request is here
 
     @SuppressWarnings("unused")
     public RpcRequestWrapper() {
     }
 
-    RpcRequestWrapper(RequestProto message) {
-      this.message = message;
+    RpcRequestWrapper(RequestHeaderProto requestHeader, Message theRequest) {
+      this.requestHeader = requestHeader;
+      this.theRequest = theRequest;
     }
 
     @Override
     public void write(DataOutput out) throws IOException {
-      ((Message)message).writeDelimitedTo(
-          DataOutputOutputStream.constructOutputStream(out));
+      OutputStream os = DataOutputOutputStream.constructOutputStream(out);
+      
+      ((Message)requestHeader).writeDelimitedTo(os);
+      theRequest.writeDelimitedTo(os);
     }
 
     @Override
@@ -296,13 +306,32 @@ public class ProtobufRpcEngine implement
       int length = ProtoUtil.readRawVarint32(in);
       byte[] bytes = new byte[length];
       in.readFully(bytes);
-      message = RequestProto.parseFrom(bytes);
+      requestHeader = RequestHeaderProto.parseFrom(bytes);
+      length = ProtoUtil.readRawVarint32(in);
+      theRequestRead = new byte[length];
+      in.readFully(theRequestRead);
     }
     
     @Override
     public String toString() {
-      return message.getDeclaringClassProtocolName() + "." +
-          message.getMethodName();
+      return requestHeader.getDeclaringClassProtocolName() + "." +
+          requestHeader.getMethodName();
+    }
+
+    @Override
+    public int getLength() {
+      int headerLen = requestHeader.getSerializedSize();
+      int reqLen;
+      if (theRequest != null) {
+        reqLen = theRequest.getSerializedSize();
+      } else if (theRequestRead != null ) {
+        reqLen = theRequestRead.length;
+      } else {
+        throw new IllegalArgumentException(
+            "getLenght on uninilialized RpcWrapper");      
+      }
+      return CodedOutputStream.computeRawVarint32Size(headerLen) +  headerLen
+          + CodedOutputStream.computeRawVarint32Size(reqLen) + reqLen;
     }
   }
 
@@ -313,29 +342,43 @@ public class ProtobufRpcEngine implement
    * Protobuf. Several methods on {@link org.apache.hadoop.ipc.Server and RPC} 
    * use type Writable as a wrapper to work across multiple RpcEngine kinds.
    */
-  private static class RpcResponseWrapper implements Writable {
-    byte[] responseMessage;
+  private static class RpcResponseWrapper implements RpcWrapper {
+    Message theResponse; // for senderSide, the response is here
+    byte[] theResponseRead; // for receiver side, the response is here
 
     @SuppressWarnings("unused")
     public RpcResponseWrapper() {
     }
 
     public RpcResponseWrapper(Message message) {
-      this.responseMessage = message.toByteArray();
+      this.theResponse = message;
     }
 
     @Override
     public void write(DataOutput out) throws IOException {
-      out.writeInt(responseMessage.length);
-      out.write(responseMessage);     
+      OutputStream os = DataOutputOutputStream.constructOutputStream(out);
+      theResponse.writeDelimitedTo(os);   
     }
 
     @Override
     public void readFields(DataInput in) throws IOException {
-      int length = in.readInt();
-      byte[] bytes = new byte[length];
-      in.readFully(bytes);
-      responseMessage = bytes;
+      int length = ProtoUtil.readRawVarint32(in);
+      theResponseRead = new byte[length];
+      in.readFully(theResponseRead);
+    }
+    
+    @Override
+    public int getLength() {
+      int resLen;
+      if (theResponse != null) {
+        resLen = theResponse.getSerializedSize();
+      } else if (theResponseRead != null ) {
+        resLen = theResponseRead.length;
+      } else {
+        throw new IllegalArgumentException(
+            "getLenght on uninilialized RpcWrapper");      
+      }
+      return CodedOutputStream.computeRawVarint32Size(resLen) + resLen;
     }
   }
 
@@ -434,7 +477,7 @@ public class ProtobufRpcEngine implement
       public Writable call(RPC.Server server, String connectionProtocolName,
           Writable writableRequest, long receiveTime) throws Exception {
         RpcRequestWrapper request = (RpcRequestWrapper) writableRequest;
-        RequestProto rpcRequest = request.message;
+        RequestHeaderProto rpcRequest = request.requestHeader;
         String methodName = rpcRequest.getMethodName();
         
         
@@ -474,7 +517,8 @@ public class ProtobufRpcEngine implement
         }
         Message prototype = service.getRequestPrototype(methodDescriptor);
         Message param = prototype.newBuilderForType()
-            .mergeFrom(rpcRequest.getRequest()).build();
+            .mergeFrom(request.theRequestRead).build();
+        
         Message result;
         try {
           long startTime = Time.now();

Modified: hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java Mon Apr  1 16:47:16 2013
@@ -77,12 +77,12 @@ import com.google.protobuf.BlockingServi
 @InterfaceAudience.LimitedPrivate(value = { "Common", "HDFS", "MapReduce", "Yarn" })
 @InterfaceStability.Evolving
 public class RPC {
+  final static int RPC_SERVICE_CLASS_DEFAULT = 0;
   public enum RpcKind {
     RPC_BUILTIN ((short) 1),         // Used for built in calls by tests
     RPC_WRITABLE ((short) 2),        // Use WritableRpcEngine 
     RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine
     final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size
-    private static final short FIRST_INDEX = RPC_BUILTIN.value;    
     public final short value; //TODO make it private
 
     RpcKind(short val) {

Modified: hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Mon Apr  1 16:47:16 2013
@@ -24,6 +24,7 @@ import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
 import java.net.BindException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -72,6 +73,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
@@ -106,6 +108,7 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.CodedOutputStream;
 
 /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
@@ -201,7 +204,8 @@ public abstract class Server {
   // 6 : Made RPC Request header explicit
   // 7 : Changed Ipc Connection Header to use Protocol buffers
   // 8 : SASL server always sends a final response
-  public static final byte CURRENT_VERSION = 8;
+  // 9 : Changes to protocol for HADOOP-8990
+  public static final byte CURRENT_VERSION = 9;
 
   /**
    * Initial and max size of response buffer
@@ -313,6 +317,14 @@ public abstract class Server {
     return (addr == null) ? null : addr.getHostAddress();
   }
 
+  /** Returns the RPC remote user when invoked inside an RPC.  Note this
+   *  may be different than the current user if called within another doAs
+   *  @return connection's UGI or null if not an RPC
+   */
+  public static UserGroupInformation getRemoteUser() {
+    Call call = CurCall.get();
+    return (call != null) ? call.connection.user : null;
+  }
  
   /** Return true if the invocation was through an RPC.
    */
@@ -426,6 +438,11 @@ public abstract class Server {
     return Arrays.asList(handlers);
   }
 
+  @VisibleForTesting
+  List<Connection> getConnections() {
+    return connectionList;
+  }
+
   /**
    * Refresh the service authorization ACL for the service handled by this server.
    */
@@ -1092,6 +1109,7 @@ public abstract class Server {
     private ByteBuffer connectionHeaderBuf = null;
     private ByteBuffer unwrappedData;
     private ByteBuffer unwrappedDataLengthBuffer;
+    private int serviceClass;
     
     UserGroupInformation user = null;
     public UserGroupInformation attemptingUser = null; // user name before auth
@@ -1219,7 +1237,8 @@ public abstract class Server {
           rpcMetrics.incrAuthenticationFailures();
           String clientIP = this.toString();
           // attempting user could be null
-          AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
+          AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser +
+            " (" + e.getLocalizedMessage() + ")");
           throw e;
         }
         if (saslServer.isComplete() && replyToken == null) {
@@ -1302,14 +1321,17 @@ public abstract class Server {
         if (!connectionHeaderRead) {
           //Every connection is expected to send the header.
           if (connectionHeaderBuf == null) {
-            connectionHeaderBuf = ByteBuffer.allocate(3);
+            connectionHeaderBuf = ByteBuffer.allocate(4);
           }
           count = channelRead(channel, connectionHeaderBuf);
           if (count < 0 || connectionHeaderBuf.remaining() > 0) {
             return count;
           }
           int version = connectionHeaderBuf.get(0);
-          byte[] method = new byte[] {connectionHeaderBuf.get(1)};
+          // TODO we should add handler for service class later
+          this.setServiceClass(connectionHeaderBuf.get(1));
+
+          byte[] method = new byte[] {connectionHeaderBuf.get(2)};
           authMethod = AuthMethod.read(new DataInputStream(
               new ByteArrayInputStream(method)));
           dataLengthBuffer.flip();
@@ -1333,7 +1355,7 @@ public abstract class Server {
           }
           
           IpcSerializationType serializationType = IpcSerializationType
-              .fromByte(connectionHeaderBuf.get(2));
+              .fromByte(connectionHeaderBuf.get(3));
           if (serializationType != IpcSerializationType.PROTOBUF) {
             respondUnsupportedSerialization(serializationType);
             return -1;
@@ -1503,10 +1525,15 @@ public abstract class Server {
       " cannot communicate with client version " + clientVersion;
       ByteArrayOutputStream buffer = new ByteArrayOutputStream();
       
-      if (clientVersion >= 3) {
+      if (clientVersion >= 9) {
+        // Versions >>9  understand the normal response
         Call fakeCall =  new Call(-1, null, this);
-        // Versions 3 and greater can interpret this exception
-        // response in the same manner
+        setupResponse(buffer, fakeCall, RpcStatusProto.FATAL,
+            null, VersionMismatch.class.getName(), errMsg);
+        responder.doRespond(fakeCall);
+      } else if (clientVersion >= 3) {
+        Call fakeCall =  new Call(-1, null, this);
+        // Versions 3 to 8 use older response
         setupResponseOldVersionFatal(buffer, fakeCall,
             null, VersionMismatch.class.getName(), errMsg);
 
@@ -1553,9 +1580,6 @@ public abstract class Server {
       UserGroupInformation protocolUser = ProtoUtil.getUgi(connectionContext);
       if (saslServer == null) {
         user = protocolUser;
-        if (user != null) {
-          user.setAuthenticationMethod(AuthMethod.SIMPLE);
-        }
       } else {
         // user is authenticated
         user.setAuthenticationMethod(authMethod);
@@ -1721,6 +1745,22 @@ public abstract class Server {
       return true;
     }
     
+    /**
+     * Get service class for connection
+     * @return the serviceClass
+     */
+    public int getServiceClass() {
+      return serviceClass;
+    }
+
+    /**
+     * Set service class for connection
+     * @param serviceClass the serviceClass to set
+     */
+    public void setServiceClass(int serviceClass) {
+      this.serviceClass = serviceClass;
+    }
+
     private synchronized void close() throws IOException {
       disposeSasl();
       data = null;
@@ -1783,6 +1823,9 @@ public abstract class Server {
                   );
             }
           } catch (Throwable e) {
+            if (e instanceof UndeclaredThrowableException) {
+              e = e.getCause();
+            }
             String logMsg = getName() + ", call " + call + ": error: " + e;
             if (e instanceof RuntimeException || e instanceof Error) {
               // These exception types indicate something is probably wrong
@@ -1988,16 +2031,34 @@ public abstract class Server {
   throws IOException {
     responseBuf.reset();
     DataOutputStream out = new DataOutputStream(responseBuf);
-    RpcResponseHeaderProto.Builder response =  
+    RpcResponseHeaderProto.Builder headerBuilder =  
         RpcResponseHeaderProto.newBuilder();
-    response.setCallId(call.callId);
-    response.setStatus(status);
-
+    headerBuilder.setCallId(call.callId);
+    headerBuilder.setStatus(status);
+    headerBuilder.setServerIpcVersionNum(Server.CURRENT_VERSION);
 
     if (status == RpcStatusProto.SUCCESS) {
+      RpcResponseHeaderProto header = headerBuilder.build();
+      final int headerLen = header.getSerializedSize();
+      int fullLength  = CodedOutputStream.computeRawVarint32Size(headerLen) +
+          headerLen;
       try {
-        response.build().writeDelimitedTo(out);
-        rv.write(out);
+        if (rv instanceof ProtobufRpcEngine.RpcWrapper) {
+          ProtobufRpcEngine.RpcWrapper resWrapper = 
+              (ProtobufRpcEngine.RpcWrapper) rv;
+          fullLength += resWrapper.getLength();
+          out.writeInt(fullLength);
+          header.writeDelimitedTo(out);
+          rv.write(out);
+        } else { // Have to serialize to buffer to get len
+          final DataOutputBuffer buf = new DataOutputBuffer();
+          rv.write(buf);
+          byte[] data = buf.getData();
+          fullLength += buf.getLength();
+          out.writeInt(fullLength);
+          header.writeDelimitedTo(out);
+          out.write(data, 0, buf.getLength());
+        }
       } catch (Throwable t) {
         LOG.warn("Error serializing call response for call " + call, t);
         // Call back to same function - this is OK since the
@@ -2008,13 +2069,15 @@ public abstract class Server {
             StringUtils.stringifyException(t));
         return;
       }
-    } else {
-      if (status == RpcStatusProto.FATAL) {
-        response.setServerIpcVersionNum(Server.CURRENT_VERSION);
-      }
-      response.build().writeDelimitedTo(out);
-      WritableUtils.writeString(out, errorClass);
-      WritableUtils.writeString(out, error);
+    } else { // Rpc Failure
+      headerBuilder.setExceptionClassName(errorClass);
+      headerBuilder.setErrorMsg(error);
+      RpcResponseHeaderProto header = headerBuilder.build();
+      int headerLen = header.getSerializedSize();
+      final int fullLength  = 
+          CodedOutputStream.computeRawVarint32Size(headerLen) + headerLen;
+      out.writeInt(fullLength);
+      header.writeDelimitedTo(out);
     }
     if (call.connection.useWrap) {
       wrapWithSasl(responseBuf, call);

Modified: hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/MetricsServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/MetricsServlet.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/MetricsServlet.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/MetricsServlet.java Mon Apr  1 16:47:16 2013
@@ -140,10 +140,12 @@ public class MetricsServlet extends Http
    */
   void printMap(PrintWriter out, Map<String, Map<String, List<TagsMetricsPair>>> map) {
     for (Map.Entry<String, Map<String, List<TagsMetricsPair>>> context : map.entrySet()) {
-      out.println(context.getKey());
+      out.print(context.getKey());
+      out.print("\n");
       for (Map.Entry<String, List<TagsMetricsPair>> record : context.getValue().entrySet()) {
         indent(out, 1);
-        out.println(record.getKey());
+        out.print(record.getKey());
+        out.print("\n");
         for (TagsMetricsPair pair : record.getValue()) {
           indent(out, 2);
           // Prints tag values in the form "{key=value,key=value}:"
@@ -159,7 +161,7 @@ public class MetricsServlet extends Http
             out.print("=");
             out.print(tagValue.getValue().toString());
           }
-          out.println("}:");
+          out.print("}:\n");
           
           // Now print metric values, one per line
           for (Map.Entry<String, Number> metricValue : 
@@ -167,7 +169,8 @@ public class MetricsServlet extends Http
             indent(out, 3);
             out.print(metricValue.getKey());
             out.print("=");
-            out.println(metricValue.getValue().toString());
+            out.print(metricValue.getValue().toString());
+            out.print("\n");
           }
         }
       }

Modified: hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/CachedDNSToSwitchMapping.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/CachedDNSToSwitchMapping.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/CachedDNSToSwitchMapping.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/CachedDNSToSwitchMapping.java Mon Apr  1 16:47:16 2013
@@ -149,4 +149,9 @@ public class CachedDNSToSwitchMapping ex
   public boolean isSingleSwitch() {
     return isMappingSingleSwitch(rawMapping);
   }
+  
+  @Override
+  public void reloadCachedMappings() {
+    cache.clear();
+  }
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/DNS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/DNS.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/DNS.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/DNS.java Mon Apr  1 16:47:16 2013
@@ -89,7 +89,12 @@ public class DNS {
       ictx.close();
     }
 
-    return attribute.get("PTR").get().toString();
+    String hostname = attribute.get("PTR").get().toString();
+    int hostnameLength = hostname.length();
+    if (hostname.charAt(hostnameLength - 1) == '.') {
+      hostname = hostname.substring(0, hostnameLength - 1);
+    }
+    return hostname;
   }
 
   /**

Modified: hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/DNSToSwitchMapping.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/DNSToSwitchMapping.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/DNSToSwitchMapping.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/DNSToSwitchMapping.java Mon Apr  1 16:47:16 2013
@@ -51,4 +51,12 @@ public interface DNSToSwitchMapping {
    * If <i>names</i> is empty, the returned list is also empty
    */
   public List<String> resolve(List<String> names);
+
+  /**
+   * Reload all of the cached mappings.
+   *
+   * If there is a cache, this method will clear it, so that future accesses
+   * will get a chance to see the new data.
+   */
+  public void reloadCachedMappings();
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java Mon Apr  1 16:47:16 2013
@@ -392,8 +392,16 @@ public class NetworkTopology {
       throw new IllegalArgumentException(
         "Not allow to add an inner node: "+NodeBase.getPath(node));
     }
+    int newDepth = NodeBase.locationToDepth(node.getNetworkLocation()) + 1;
     netlock.writeLock().lock();
     try {
+      if ((depthOfAllLeaves != -1) && (depthOfAllLeaves != newDepth)) {
+        LOG.error("Error: can't add leaf node at depth " +
+            newDepth + " to topology:\n" + oldTopoStr);
+        throw new InvalidTopologyException("Invalid network topology. " +
+            "You cannot have a rack and a non-rack node at the same " +
+            "level of the network topology.");
+      }
       Node rack = getNodeForNetworkLocation(node);
       if (rack != null && !(rack instanceof InnerNode)) {
         throw new IllegalArgumentException("Unexpected data node " 
@@ -408,14 +416,6 @@ public class NetworkTopology {
         if (!(node instanceof InnerNode)) {
           if (depthOfAllLeaves == -1) {
             depthOfAllLeaves = node.getLevel();
-          } else {
-            if (depthOfAllLeaves != node.getLevel()) {
-              LOG.error("Error: can't add leaf node at depth " +
-                  node.getLevel() + " to topology:\n" + oldTopoStr);
-              throw new InvalidTopologyException("Invalid network topology. " +
-                  "You cannot have a rack and a non-rack node at the same " +
-                  "level of the network topology.");
-            }
           }
         }
       }

Modified: hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java Mon Apr  1 16:47:16 2013
@@ -167,4 +167,16 @@ public class NodeBase implements Node {
   public void setLevel(int level) {
     this.level = level;
   }
+  
+  public static int locationToDepth(String location) {
+    String normalizedLocation = normalize(location);
+    int length = normalizedLocation.length();
+    int depth = 0;
+    for (int i = 0; i < length; i++) {
+      if (normalizedLocation.charAt(i) == PATH_SEPARATOR) {
+        depth++;
+      }
+    }
+    return depth;
+  }
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/ScriptBasedMapping.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/ScriptBasedMapping.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/ScriptBasedMapping.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/ScriptBasedMapping.java Mon Apr  1 16:47:16 2013
@@ -263,5 +263,11 @@ public final class ScriptBasedMapping ex
     public String toString() {
       return scriptName != null ? ("script " + scriptName) : NO_SCRIPT;
     }
+
+    @Override
+    public void reloadCachedMappings() {
+      // Nothing to do here, since RawScriptBasedMapping has no cache, and
+      // does not inherit from CachedDNSToSwitchMapping
+    }
   }
 }



Mime
View raw message