hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1399950 [8/17] - in /hadoop/common/branches/HDFS-2802/hadoop-common-project: hadoop-annotations/ hadoop-annotations/src/main/java/org/apache/hadoop/classification/tools/ hadoop-auth-examples/ hadoop-auth/ hadoop-auth/src/main/java/org/apac...
Date Fri, 19 Oct 2012 02:27:38 GMT
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java Fri Oct 19 02:25:55 2012
@@ -20,15 +20,11 @@ package org.apache.hadoop.io.compress;
 
 import java.io.*;
 import java.util.zip.GZIPOutputStream;
-import java.util.zip.GZIPInputStream;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.compress.zlib.*;
-import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
-import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
 
 /**
  * This class creates gzip compressors/decompressors. 
@@ -66,32 +62,39 @@ public class GzipCodec extends DefaultCo
       super(out);
     }
     
+    @Override
     public void close() throws IOException {
       out.close();
     }
     
+    @Override
     public void flush() throws IOException {
       out.flush();
     }
     
+    @Override
     public void write(int b) throws IOException {
       out.write(b);
     }
     
+    @Override
     public void write(byte[] data, int offset, int length) 
       throws IOException {
       out.write(data, offset, length);
     }
     
+    @Override
     public void finish() throws IOException {
       ((ResetableGZIPOutputStream) out).finish();
     }
 
+    @Override
     public void resetState() throws IOException {
       ((ResetableGZIPOutputStream) out).resetState();
     }
   }
 
+  @Override
   public CompressionOutputStream createOutputStream(OutputStream out) 
     throws IOException {
     return (ZlibFactory.isNativeZlibLoaded(conf)) ?
@@ -100,6 +103,7 @@ public class GzipCodec extends DefaultCo
                new GzipOutputStream(out);
   }
   
+  @Override
   public CompressionOutputStream createOutputStream(OutputStream out, 
                                                     Compressor compressor) 
   throws IOException {
@@ -110,23 +114,27 @@ public class GzipCodec extends DefaultCo
                createOutputStream(out);
   }
 
+  @Override
   public Compressor createCompressor() {
     return (ZlibFactory.isNativeZlibLoaded(conf))
       ? new GzipZlibCompressor(conf)
       : null;
   }
 
+  @Override
   public Class<? extends Compressor> getCompressorType() {
     return ZlibFactory.isNativeZlibLoaded(conf)
       ? GzipZlibCompressor.class
       : null;
   }
 
+  @Override
   public CompressionInputStream createInputStream(InputStream in)
   throws IOException {
     return createInputStream(in, null);
   }
 
+  @Override
   public CompressionInputStream createInputStream(InputStream in,
                                                   Decompressor decompressor)
   throws IOException {
@@ -137,18 +145,21 @@ public class GzipCodec extends DefaultCo
                                   conf.getInt("io.file.buffer.size", 4*1024));
   }
 
+  @Override
   public Decompressor createDecompressor() {
     return (ZlibFactory.isNativeZlibLoaded(conf))
       ? new GzipZlibDecompressor()
       : new BuiltInGzipDecompressor();
   }
 
+  @Override
   public Class<? extends Decompressor> getDecompressorType() {
     return ZlibFactory.isNativeZlibLoaded(conf)
       ? GzipZlibDecompressor.class
       : BuiltInGzipDecompressor.class;
   }
 
+  @Override
   public String getDefaultExtension() {
     return ".gz";
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java Fri Oct 19 02:25:55 2012
@@ -24,7 +24,6 @@ import java.io.OutputStream;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.snappy.LoadSnappy;
 import org.apache.hadoop.io.compress.snappy.SnappyCompressor;
 import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -34,11 +33,6 @@ import org.apache.hadoop.util.NativeCode
  * This class creates snappy compressors/decompressors.
  */
 public class SnappyCodec implements Configurable, CompressionCodec {
-
-  static {
-    LoadSnappy.isLoaded();
-  }
-
   Configuration conf;
 
   /**
@@ -63,11 +57,26 @@ public class SnappyCodec implements Conf
 
   /**
    * Are the native snappy libraries loaded & initialized?
-   *
-   * @return true if loaded & initialized, otherwise false
    */
+  public static void checkNativeCodeLoaded() {
+      if (!NativeCodeLoader.buildSupportsSnappy()) {
+        throw new RuntimeException("native snappy library not available: " +
+            "this version of libhadoop was built without " +
+            "snappy support.");
+      }
+      if (!SnappyCompressor.isNativeCodeLoaded()) {
+        throw new RuntimeException("native snappy library not available: " +
+            "SnappyCompressor has not been loaded.");
+      }
+      if (!SnappyDecompressor.isNativeCodeLoaded()) {
+        throw new RuntimeException("native snappy library not available: " +
+            "SnappyDecompressor has not been loaded.");
+      }
+  }
+  
   public static boolean isNativeCodeLoaded() {
-    return LoadSnappy.isLoaded() && NativeCodeLoader.isNativeCodeLoaded();
+    return SnappyCompressor.isNativeCodeLoaded() && 
+        SnappyDecompressor.isNativeCodeLoaded();
   }
 
   /**
@@ -97,9 +106,7 @@ public class SnappyCodec implements Conf
   public CompressionOutputStream createOutputStream(OutputStream out,
                                                     Compressor compressor)
       throws IOException {
-    if (!isNativeCodeLoaded()) {
-      throw new RuntimeException("native snappy library not available");
-    }
+    checkNativeCodeLoaded();
     int bufferSize = conf.getInt(
         CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
         CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT);
@@ -117,10 +124,7 @@ public class SnappyCodec implements Conf
    */
   @Override
   public Class<? extends Compressor> getCompressorType() {
-    if (!isNativeCodeLoaded()) {
-      throw new RuntimeException("native snappy library not available");
-    }
-
+    checkNativeCodeLoaded();
     return SnappyCompressor.class;
   }
 
@@ -131,9 +135,7 @@ public class SnappyCodec implements Conf
    */
   @Override
   public Compressor createCompressor() {
-    if (!isNativeCodeLoaded()) {
-      throw new RuntimeException("native snappy library not available");
-    }
+    checkNativeCodeLoaded();
     int bufferSize = conf.getInt(
         CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
         CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT);
@@ -167,10 +169,7 @@ public class SnappyCodec implements Conf
   public CompressionInputStream createInputStream(InputStream in,
                                                   Decompressor decompressor)
       throws IOException {
-    if (!isNativeCodeLoaded()) {
-      throw new RuntimeException("native snappy library not available");
-    }
-
+    checkNativeCodeLoaded();
     return new BlockDecompressorStream(in, decompressor, conf.getInt(
         CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
         CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT));
@@ -183,10 +182,7 @@ public class SnappyCodec implements Conf
    */
   @Override
   public Class<? extends Decompressor> getDecompressorType() {
-    if (!isNativeCodeLoaded()) {
-      throw new RuntimeException("native snappy library not available");
-    }
-
+    checkNativeCodeLoaded();
     return SnappyDecompressor.class;
   }
 
@@ -197,9 +193,7 @@ public class SnappyCodec implements Conf
    */
   @Override
   public Decompressor createDecompressor() {
-    if (!isNativeCodeLoaded()) {
-      throw new RuntimeException("native snappy library not available");
-    }
+    checkNativeCodeLoaded();
     int bufferSize = conf.getInt(
         CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
         CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT);

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java Fri Oct 19 02:25:55 2012
@@ -338,6 +338,7 @@ public class CBZip2InputStream extends I
   }
 
 
+  @Override
   public int read() throws IOException {
 
     if (this.in != null) {
@@ -372,6 +373,7 @@ public class CBZip2InputStream extends I
    */
 
 
+  @Override
   public int read(final byte[] dest, final int offs, final int len)
       throws IOException {
     if (offs < 0) {
@@ -574,6 +576,7 @@ public class CBZip2InputStream extends I
     }
   }
 
+  @Override
   public void close() throws IOException {
     InputStream inShadow = this.in;
     if (inShadow != null) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java Fri Oct 19 02:25:55 2012
@@ -639,6 +639,7 @@ public class CBZip2OutputStream extends 
     init();
   }
 
+  @Override
   public void write(final int b) throws IOException {
     if (this.out != null) {
       write0(b);
@@ -704,6 +705,7 @@ public class CBZip2OutputStream extends 
   /**
   * Overriden to close the stream.
   */
+  @Override
   protected void finalize() throws Throwable {
     finish();
     super.finalize();
@@ -726,6 +728,7 @@ public class CBZip2OutputStream extends 
     }
   }
 
+  @Override
   public void close() throws IOException {
     if (out != null) {
       OutputStream outShadow = this.out;
@@ -739,6 +742,7 @@ public class CBZip2OutputStream extends 
     }
   }
   
+  @Override
   public void flush() throws IOException {
     OutputStream outShadow = this.out;
     if (outShadow != null) {
@@ -849,6 +853,7 @@ public class CBZip2OutputStream extends 
     return this.blockSize100k;
   }
 
+  @Override
   public void write(final byte[] buf, int offs, final int len)
       throws IOException {
     if (offs < 0) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.java Fri Oct 19 02:25:55 2012
@@ -258,6 +258,7 @@ public class Lz4Decompressor implements 
     return 0;
   }
 
+  @Override
   public synchronized void reset() {
     finished = false;
     compressedDirectBufLen = 0;

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java Fri Oct 19 02:25:55 2012
@@ -26,6 +26,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.util.NativeCodeLoader;
 
 /**
  * A {@link Compressor} based on the snappy compression algorithm.
@@ -51,22 +52,24 @@ public class SnappyCompressor implements
   private long bytesRead = 0L;
   private long bytesWritten = 0L;
 
-
+  private static boolean nativeSnappyLoaded = false;
+  
   static {
-    if (LoadSnappy.isLoaded()) {
-      // Initialize the native library
+    if (NativeCodeLoader.isNativeCodeLoaded() &&
+        NativeCodeLoader.buildSupportsSnappy()) {
       try {
         initIDs();
+        nativeSnappyLoaded = true;
       } catch (Throwable t) {
-        // Ignore failure to load/initialize snappy
-        LOG.warn(t.toString());
+        LOG.error("failed to load SnappyCompressor", t);
       }
-    } else {
-      LOG.error("Cannot load " + SnappyCompressor.class.getName() +
-          " without snappy library!");
     }
   }
-
+  
+  public static boolean isNativeCodeLoaded() {
+    return nativeSnappyLoaded;
+  }
+  
   /**
    * Creates a new compressor.
    *

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java Fri Oct 19 02:25:55 2012
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.util.NativeCodeLoader;
 
 /**
  * A {@link Decompressor} based on the snappy compression algorithm.
@@ -47,21 +48,24 @@ public class SnappyDecompressor implemen
   private int userBufOff = 0, userBufLen = 0;
   private boolean finished;
 
+  private static boolean nativeSnappyLoaded = false;
+
   static {
-    if (LoadSnappy.isLoaded()) {
-      // Initialize the native library
+    if (NativeCodeLoader.isNativeCodeLoaded() &&
+        NativeCodeLoader.buildSupportsSnappy()) {
       try {
         initIDs();
+        nativeSnappyLoaded = true;
       } catch (Throwable t) {
-        // Ignore failure to load/initialize snappy
-        LOG.warn(t.toString());
+        LOG.error("failed to load SnappyDecompressor", t);
       }
-    } else {
-      LOG.error("Cannot load " + SnappyDecompressor.class.getName() +
-          " without snappy library!");
     }
   }
-
+  
+  public static boolean isNativeCodeLoaded() {
+    return nativeSnappyLoaded;
+  }
+  
   /**
    * Creates a new compressor.
    *
@@ -257,6 +261,7 @@ public class SnappyDecompressor implemen
     return 0;
   }
 
+  @Override
   public synchronized void reset() {
     finished = false;
     compressedDirectBufLen = 0;

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java Fri Oct 19 02:25:55 2012
@@ -122,7 +122,7 @@ public class BuiltInGzipDecompressor imp
     //        in the first buffer load?  (But how else would one do it?)
   }
 
-  /** {@inheritDoc} */
+  @Override
   public synchronized boolean needsInput() {
     if (state == GzipStateLabel.DEFLATE_STREAM) {  // most common case
       return inflater.needsInput();
@@ -144,6 +144,7 @@ public class BuiltInGzipDecompressor imp
    *        the bulk deflate stream, which is a performance hit we don't want
    *        to absorb.  (Decompressor now documents this requirement.)
    */
+  @Override
   public synchronized void setInput(byte[] b, int off, int len) {
     if (b == null) {
       throw new NullPointerException();
@@ -175,6 +176,7 @@ public class BuiltInGzipDecompressor imp
    * methods below), the deflate stream is never copied; Inflater operates
    * directly on the user's buffer.
    */
+  @Override
   public synchronized int decompress(byte[] b, int off, int len)
   throws IOException {
     int numAvailBytes = 0;
@@ -385,7 +387,7 @@ public class BuiltInGzipDecompressor imp
       copyBytesToLocal(n);       // modifies userBufLen, etc.
       if (localBufOff >= 4) {    // should be strictly ==
         long inputSize = readUIntLE(localBuf, 0);
-        if (inputSize != (inflater.getBytesWritten() & 0xffffffff)) {
+        if (inputSize != (inflater.getBytesWritten() & 0xffffffffL)) {
           throw new IOException(
             "stored gzip size doesn't match decompressed size");
         }
@@ -421,16 +423,17 @@ public class BuiltInGzipDecompressor imp
    *
    * @return the total (non-negative) number of unprocessed bytes in input
    */
+  @Override
   public synchronized int getRemaining() {
     return userBufLen;
   }
 
-  /** {@inheritDoc} */
+  @Override
   public synchronized boolean needsDictionary() {
     return inflater.needsDictionary();
   }
 
-  /** {@inheritDoc} */
+  @Override
   public synchronized void setDictionary(byte[] b, int off, int len) {
     inflater.setDictionary(b, off, len);
   }
@@ -439,6 +442,7 @@ public class BuiltInGzipDecompressor imp
    * Returns true if the end of the gzip substream (single "member") has been
    * reached.</p>
    */
+  @Override
   public synchronized boolean finished() {
     return (state == GzipStateLabel.FINISHED);
   }
@@ -447,6 +451,7 @@ public class BuiltInGzipDecompressor imp
    * Resets everything, including the input buffer, regardless of whether the
    * current gzip substream is finished.</p>
    */
+  @Override
   public synchronized void reset() {
     // could optionally emit INFO message if state != GzipStateLabel.FINISHED
     inflater.reset();
@@ -463,7 +468,7 @@ public class BuiltInGzipDecompressor imp
     hasHeaderCRC = false;
   }
 
-  /** {@inheritDoc} */
+  @Override
   public synchronized void end() {
     inflater.end();
   }
@@ -566,7 +571,7 @@ public class BuiltInGzipDecompressor imp
     return ((((long)(b[off+3] & 0xff) << 24) |
              ((long)(b[off+2] & 0xff) << 16) |
              ((long)(b[off+1] & 0xff) <<  8) |
-             ((long)(b[off]   & 0xff)      )) & 0xffffffff);
+             ((long)(b[off]   & 0xff)      )) & 0xffffffffL);
   }
 
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java Fri Oct 19 02:25:55 2012
@@ -48,6 +48,7 @@ public class BuiltInZlibDeflater extends
     super();
   }
 
+  @Override
   public synchronized int compress(byte[] b, int off, int len) 
     throws IOException {
     return super.deflate(b, off, len);

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibInflater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibInflater.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibInflater.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibInflater.java Fri Oct 19 02:25:55 2012
@@ -39,6 +39,7 @@ public class BuiltInZlibInflater extends
     super();
   }
 
+  @Override
   public synchronized int decompress(byte[] b, int off, int len) 
     throws IOException {
     try {

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java Fri Oct 19 02:25:55 2012
@@ -259,6 +259,7 @@ public class ZlibCompressor implements C
     }
   }
 
+  @Override
   public synchronized void setInput(byte[] b, int off, int len) {
     if (b== null) {
       throw new NullPointerException();
@@ -287,6 +288,7 @@ public class ZlibCompressor implements C
     uncompressedDirectBufLen = uncompressedDirectBuf.position();
   }
 
+  @Override
   public synchronized void setDictionary(byte[] b, int off, int len) {
     if (stream == 0 || b == null) {
       throw new NullPointerException();
@@ -297,6 +299,7 @@ public class ZlibCompressor implements C
     setDictionary(stream, b, off, len);
   }
 
+  @Override
   public synchronized boolean needsInput() {
     // Consume remaining compressed data?
     if (compressedDirectBuf.remaining() > 0) {
@@ -325,16 +328,19 @@ public class ZlibCompressor implements C
     return false;
   }
   
+  @Override
   public synchronized void finish() {
     finish = true;
   }
   
+  @Override
   public synchronized boolean finished() {
     // Check if 'zlib' says its 'finished' and
     // all compressed data has been consumed
     return (finished && compressedDirectBuf.remaining() == 0);
   }
 
+  @Override
   public synchronized int compress(byte[] b, int off, int len) 
     throws IOException {
     if (b == null) {
@@ -385,6 +391,7 @@ public class ZlibCompressor implements C
    *
    * @return the total (non-negative) number of compressed bytes output so far
    */
+  @Override
   public synchronized long getBytesWritten() {
     checkStream();
     return getBytesWritten(stream);
@@ -395,11 +402,13 @@ public class ZlibCompressor implements C
    *
    * @return the total (non-negative) number of uncompressed bytes input so far
    */
+  @Override
   public synchronized long getBytesRead() {
     checkStream();
     return getBytesRead(stream);
   }
 
+  @Override
   public synchronized void reset() {
     checkStream();
     reset(stream);
@@ -413,6 +422,7 @@ public class ZlibCompressor implements C
     userBufOff = userBufLen = 0;
   }
   
+  @Override
   public synchronized void end() {
     if (stream != 0) {
       end(stream);

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java Fri Oct 19 02:25:55 2012
@@ -118,6 +118,7 @@ public class ZlibDecompressor implements
     this(CompressionHeader.DEFAULT_HEADER, DEFAULT_DIRECT_BUFFER_SIZE);
   }
 
+  @Override
   public synchronized void setInput(byte[] b, int off, int len) {
     if (b == null) {
       throw new NullPointerException();
@@ -154,6 +155,7 @@ public class ZlibDecompressor implements
     userBufLen -= compressedDirectBufLen;
   }
 
+  @Override
   public synchronized void setDictionary(byte[] b, int off, int len) {
     if (stream == 0 || b == null) {
       throw new NullPointerException();
@@ -165,6 +167,7 @@ public class ZlibDecompressor implements
     needDict = false;
   }
 
+  @Override
   public synchronized boolean needsInput() {
     // Consume remaining compressed data?
     if (uncompressedDirectBuf.remaining() > 0) {
@@ -184,16 +187,19 @@ public class ZlibDecompressor implements
     return false;
   }
 
+  @Override
   public synchronized boolean needsDictionary() {
     return needDict;
   }
 
+  @Override
   public synchronized boolean finished() {
     // Check if 'zlib' says it's 'finished' and
     // all compressed data has been consumed
     return (finished && uncompressedDirectBuf.remaining() == 0);
   }
 
+  @Override
   public synchronized int decompress(byte[] b, int off, int len) 
     throws IOException {
     if (b == null) {
@@ -255,6 +261,7 @@ public class ZlibDecompressor implements
    *
    * @return the total (non-negative) number of unprocessed bytes in input
    */
+  @Override
   public synchronized int getRemaining() {
     checkStream();
     return userBufLen + getRemaining(stream);  // userBuf + compressedDirectBuf
@@ -263,6 +270,7 @@ public class ZlibDecompressor implements
   /**
    * Resets everything including the input buffers (user and direct).</p>
    */
+  @Override
   public synchronized void reset() {
     checkStream();
     reset(stream);
@@ -274,6 +282,7 @@ public class ZlibDecompressor implements
     userBufOff = userBufLen = 0;
   }
 
+  @Override
   public synchronized void end() {
     if (stream != 0) {
       end(stream);
@@ -281,6 +290,7 @@ public class ZlibDecompressor implements
     }
   }
 
+  @Override
   protected void finalize() {
     end();
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/BCFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/BCFile.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/BCFile.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/BCFile.java Fri Oct 19 02:25:55 2012
@@ -300,6 +300,7 @@ final class BCFile {
      * Close the BCFile Writer. Attempting to use the Writer after calling
      * <code>close</code> is not allowed and may lead to undetermined results.
      */
+    @Override
     public void close() throws IOException {
       if (closed == true) {
         return;
@@ -447,6 +448,7 @@ final class BCFile {
         this.compressAlgo = compressAlgo;
       }
 
+      @Override
       public void register(long raw, long begin, long end) {
         metaIndex.addEntry(new MetaIndexEntry(name, compressAlgo,
             new BlockRegion(begin, end - begin, raw)));
@@ -463,6 +465,7 @@ final class BCFile {
         // do nothing
       }
 
+      @Override
       public void register(long raw, long begin, long end) {
         dataIndex.addBlockRegion(new BlockRegion(begin, end - begin, raw));
       }
@@ -671,6 +674,7 @@ final class BCFile {
     /**
      * Finishing reading the BCFile. Release all resources.
      */
+    @Override
     public void close() {
       // nothing to be done now
     }

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/CompareUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/CompareUtils.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/CompareUtils.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/CompareUtils.java Fri Oct 19 02:25:55 2012
@@ -68,6 +68,7 @@ class CompareUtils {
       magnitude = m;
     }
 
+    @Override
     public long magnitude() {
       return magnitude;
     }

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/TFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/TFile.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/TFile.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/TFile.java Fri Oct 19 02:25:55 2012
@@ -297,6 +297,7 @@ public class TFile {
      * 
      * The underlying FSDataOutputStream is not closed.
      */
+    @Override
     public void close() throws IOException {
       if ((state == State.CLOSED)) {
         return;
@@ -820,6 +821,7 @@ public class TFile {
      * Close the reader. The state of the Reader object is undefined after
      * close. Calling close() for multiple times has no effect.
      */
+    @Override
     public void close() throws IOException {
       readerBCF.close();
     }
@@ -1573,6 +1575,7 @@ public class TFile {
        * scanner after calling close is not defined. The entry returned by the
        * previous entry() call will be invalid.
        */
+      @Override
       public void close() throws IOException {
         parkCursorAtEnd();
       }
@@ -2102,7 +2105,7 @@ public class TFile {
     }
 
     public boolean isSorted() {
-      return !strComparator.equals("");
+      return !strComparator.isEmpty();
     }
 
     public String getComparatorString() {

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/TFileDumper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/TFileDumper.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/TFileDumper.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/TFileDumper.java Fri Oct 19 02:25:55 2012
@@ -125,7 +125,7 @@ class TFileDumper {
           dataSizeUncompressed += region.getRawSize();
         }
         properties.put("Data Block Bytes", Long.toString(dataSize));
-        if (reader.readerBCF.getDefaultCompressionName() != "none") {
+        if (!reader.readerBCF.getDefaultCompressionName().equals("none")) {
           properties.put("Data Block Uncompressed Bytes", Long
               .toString(dataSizeUncompressed));
           properties.put("Data Block Compression Ratio", String.format(

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java Fri Oct 19 02:25:55 2012
@@ -202,6 +202,7 @@ public class NativeIO {
       this.mode = mode;
     }
 
+    @Override
     public String toString() {
       return "Stat(owner='" + owner + "', group='" + group + "'" +
         ", mode=" + mode + ")";

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIOException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIOException.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIOException.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIOException.java Fri Oct 19 02:25:55 2012
@@ -38,6 +38,7 @@ public class NativeIOException extends I
     return errno;
   }
 
+  @Override
   public String toString() {
     return errno.toString() + ": " + super.getMessage();
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java Fri Oct 19 02:25:55 2012
@@ -150,6 +150,7 @@ public class RetryPolicies {
   }
   
   static class TryOnceThenFail implements RetryPolicy {
+    @Override
     public RetryAction shouldRetry(Exception e, int retries, int failovers,
         boolean isMethodIdempotent) throws Exception {
       return RetryAction.FAIL;
@@ -157,6 +158,7 @@ public class RetryPolicies {
   }
   
   static class RetryForever implements RetryPolicy {
+    @Override
     public RetryAction shouldRetry(Exception e, int retries, int failovers,
         boolean isMethodIdempotent) throws Exception {
       return RetryAction.RETRY;
@@ -430,6 +432,7 @@ public class RetryPolicies {
       this.exceptionToPolicyMap = exceptionToPolicyMap;
     }
 
+    @Override
     public RetryAction shouldRetry(Exception e, int retries, int failovers,
         boolean isMethodIdempotent) throws Exception {
       RetryPolicy policy = exceptionToPolicyMap.get(e.getClass());
@@ -457,6 +460,7 @@ public class RetryPolicies {
       }
     }
 
+    @Override
     public RetryAction shouldRetry(Exception e, int retries, int failovers,
         boolean isMethodIdempotent) throws Exception {
       RetryPolicy policy = null;

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/DeserializerComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/DeserializerComparator.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/DeserializerComparator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/DeserializerComparator.java Fri Oct 19 02:25:55 2012
@@ -56,6 +56,7 @@ public abstract class DeserializerCompar
     this.deserializer.open(buffer);
   }
 
+  @Override
   public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
     try {
       

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/JavaSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/JavaSerialization.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/JavaSerialization.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/JavaSerialization.java Fri Oct 19 02:25:55 2012
@@ -24,11 +24,8 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
-import java.util.Map;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.RawComparator;
 
 /**
  * <p>
@@ -45,6 +42,7 @@ public class JavaSerialization implement
 
     private ObjectInputStream ois;
 
+    @Override
     public void open(InputStream in) throws IOException {
       ois = new ObjectInputStream(in) {
         @Override protected void readStreamHeader() {
@@ -53,6 +51,7 @@ public class JavaSerialization implement
       };
     }
     
+    @Override
     @SuppressWarnings("unchecked")
     public T deserialize(T object) throws IOException {
       try {
@@ -63,6 +62,7 @@ public class JavaSerialization implement
       }
     }
 
+    @Override
     public void close() throws IOException {
       ois.close();
     }
@@ -74,6 +74,7 @@ public class JavaSerialization implement
 
     private ObjectOutputStream oos;
 
+    @Override
     public void open(OutputStream out) throws IOException {
       oos = new ObjectOutputStream(out) {
         @Override protected void writeStreamHeader() {
@@ -82,27 +83,32 @@ public class JavaSerialization implement
       };
     }
 
+    @Override
     public void serialize(Serializable object) throws IOException {
       oos.reset(); // clear (class) back-references
       oos.writeObject(object);
     }
 
+    @Override
     public void close() throws IOException {
       oos.close();
     }
 
   }
 
+  @Override
   @InterfaceAudience.Private
   public boolean accept(Class<?> c) {
     return Serializable.class.isAssignableFrom(c);
   }
 
+  @Override
   @InterfaceAudience.Private
   public Deserializer<Serializable> getDeserializer(Class<Serializable> c) {
     return new JavaSerializationDeserializer<Serializable>();
   }
 
+  @Override
   @InterfaceAudience.Private
   public Serializer<Serializable> getSerializer(Class<Serializable> c) {
     return new JavaSerializationSerializer();

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/JavaSerializationComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/JavaSerializationComparator.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/JavaSerializationComparator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/JavaSerializationComparator.java Fri Oct 19 02:25:55 2012
@@ -44,6 +44,7 @@ public class JavaSerializationComparator
     super(new JavaSerialization.JavaSerializationDeserializer<T>());
   }
 
+  @Override
   @InterfaceAudience.Private
   public int compare(T o1, T o2) {
     return o1.compareTo(o2);

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/SerializationFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/SerializationFactory.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/SerializationFactory.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/SerializationFactory.java Fri Oct 19 02:25:55 2012
@@ -40,12 +40,12 @@ import org.apache.hadoop.util.Reflection
 @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
 @InterfaceStability.Evolving
 public class SerializationFactory extends Configured {
-  
-  private static final Log LOG =
+
+  static final Log LOG =
     LogFactory.getLog(SerializationFactory.class.getName());
 
   private List<Serialization<?>> serializations = new ArrayList<Serialization<?>>();
-  
+
   /**
    * <p>
    * Serializations are found by reading the <code>io.serializations</code>
@@ -55,15 +55,21 @@ public class SerializationFactory extend
    */
   public SerializationFactory(Configuration conf) {
     super(conf);
-    for (String serializerName : conf.getStrings(
-      CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
-      new String[]{WritableSerialization.class.getName(),
-        AvroSpecificSerialization.class.getName(),
-        AvroReflectSerialization.class.getName()})) {
-      add(conf, serializerName);
+    if (conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY).equals("")) {
+      LOG.warn("Serialization for various data types may not be available. Please configure "
+          + CommonConfigurationKeys.IO_SERIALIZATIONS_KEY
+          + " properly to have serialization support (it is currently not set).");
+    } else {
+      for (String serializerName : conf.getStrings(
+          CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, new String[] {
+              WritableSerialization.class.getName(),
+              AvroSpecificSerialization.class.getName(),
+              AvroReflectSerialization.class.getName() })) {
+        add(conf, serializerName);
+      }
     }
   }
-  
+
   @SuppressWarnings("unchecked")
   private void add(Configuration conf, String serializationName) {
     try {
@@ -101,5 +107,5 @@ public class SerializationFactory extend
     }
     return null;
   }
-  
+
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/WritableSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/WritableSerialization.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/WritableSerialization.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/WritableSerialization.java Fri Oct 19 02:25:55 2012
@@ -23,8 +23,6 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.Map;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java Fri Oct 19 02:25:55 2012
@@ -47,11 +47,13 @@ public abstract class AvroSerialization<
   @InterfaceAudience.Private
   public static final String AVRO_SCHEMA_KEY = "Avro-Schema";
 
+  @Override
   @InterfaceAudience.Private
   public Deserializer<T> getDeserializer(Class<T> c) {
     return new AvroDeserializer(c);
   }
 
+  @Override
   @InterfaceAudience.Private
   public Serializer<T> getSerializer(Class<T> c) {
     return new AvroSerializer(c);

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Fri Oct 19 02:25:55 2012
@@ -75,6 +75,7 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.TokenSelector;
 import org.apache.hadoop.util.ProtoUtil;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Time;
 
 /** A client for an IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
@@ -82,6 +83,8 @@ import org.apache.hadoop.util.Reflection
  * 
  * @see Server
  */
+@InterfaceAudience.LimitedPrivate(value = { "Common", "HDFS", "MapReduce", "Yarn" })
+@InterfaceStability.Evolving
 public class Client {
   
   public static final Log LOG = LogFactory.getLog(Client.class);
@@ -222,7 +225,6 @@ public class Client {
     private IpcConnectionContextProto connectionContext;   // connection context
     private final ConnectionId remoteId;                // connection id
     private AuthMethod authMethod; // authentication method
-    private boolean useSasl;
     private Token<? extends TokenIdentifier> token;
     private SaslRpcClient saslRpcClient;
     
@@ -267,8 +269,7 @@ public class Client {
 
       UserGroupInformation ticket = remoteId.getTicket();
       Class<?> protocol = remoteId.getProtocol();
-      this.useSasl = UserGroupInformation.isSecurityEnabled();
-      if (useSasl && protocol != null) {
+      if (protocol != null) {
         TokenInfo tokenInfo = SecurityUtil.getTokenInfo(protocol, conf);
         if (tokenInfo != null) {
           TokenSelector<? extends TokenIdentifier> tokenSelector = null;
@@ -293,12 +294,12 @@ public class Client {
         }
       }
       
-      if (!useSasl) {
-        authMethod = AuthMethod.SIMPLE;
-      } else if (token != null) {
+      if (token != null) {
         authMethod = AuthMethod.DIGEST;
-      } else {
+      } else if (UserGroupInformation.isSecurityEnabled()) {
         authMethod = AuthMethod.KERBEROS;
+      } else {
+        authMethod = AuthMethod.SIMPLE;
       }
       
       connectionContext = ProtoUtil.makeIpcConnectionContext(
@@ -316,7 +317,7 @@ public class Client {
 
     /** Update lastActivity with the current time. */
     private void touch() {
-      lastActivity.set(System.currentTimeMillis());
+      lastActivity.set(Time.now());
     }
 
     /**
@@ -363,6 +364,7 @@ public class Client {
        * until a byte is read.
        * @throws IOException for any IO problem other than socket timeout
        */
+      @Override
       public int read() throws IOException {
         do {
           try {
@@ -379,6 +381,7 @@ public class Client {
        * 
        * @return the total number of bytes read; -1 if the connection is closed.
        */
+      @Override
       public int read(byte[] buf, int off, int len) throws IOException {
         do {
           try {
@@ -509,6 +512,7 @@ public class Client {
         final Random rand, final UserGroupInformation ugi) throws IOException,
         InterruptedException {
       ugi.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
         public Object run() throws IOException, InterruptedException {
           final short MAX_BACKOFF = 5000;
           closeConnection();
@@ -570,14 +574,12 @@ public class Client {
           InputStream inStream = NetUtils.getInputStream(socket);
           OutputStream outStream = NetUtils.getOutputStream(socket);
           writeConnectionHeader(outStream);
-          if (useSasl) {
+          if (authMethod != AuthMethod.SIMPLE) {
             final InputStream in2 = inStream;
             final OutputStream out2 = outStream;
             UserGroupInformation ticket = remoteId.getTicket();
-            if (authMethod == AuthMethod.KERBEROS) {
-              if (ticket.getRealUser() != null) {
-                ticket = ticket.getRealUser();
-              }
+            if (ticket.getRealUser() != null) {
+              ticket = ticket.getRealUser();
             }
             boolean continueSasl = false;
             try {
@@ -608,7 +610,6 @@ public class Client {
                   connectionContext.getProtocol(), 
                   ProtoUtil.getUgi(connectionContext.getUserInfo()),
                   authMethod);
-              useSasl = false;
             }
           }
         
@@ -762,7 +763,7 @@ public class Client {
     private synchronized boolean waitForWork() {
       if (calls.isEmpty() && !shouldCloseConnection.get()  && running.get())  {
         long timeout = maxIdleTime-
-              (System.currentTimeMillis()-lastActivity.get());
+              (Time.now()-lastActivity.get());
         if (timeout>0) {
           try {
             wait(timeout);
@@ -792,7 +793,7 @@ public class Client {
      * since last I/O activity is equal to or greater than the ping interval
      */
     private synchronized void sendPing() throws IOException {
-      long curTime = System.currentTimeMillis();
+      long curTime = Time.now();
       if ( curTime - lastActivity.get() >= pingInterval) {
         lastActivity.set(curTime);
         synchronized (out) {
@@ -802,6 +803,7 @@ public class Client {
       }
     }
 
+    @Override
     public void run() {
       if (LOG.isDebugEnabled())
         LOG.debug(getName() + ": starting, having connections " 
@@ -1167,7 +1169,7 @@ public class Client {
                   call.error);
         }
       } else {
-        return call.rpcResponse;
+        return call.getRpcResult();
       }
     }
   }
@@ -1398,5 +1400,10 @@ public class Client {
       result = PRIME * result + ((ticket == null) ? 0 : ticket.hashCode());
       return result;
     }
+    
+    @Override
+    public String toString() {
+      return serverPrincipal + "@" + address;
+    }
   }  
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java Fri Oct 19 02:25:55 2012
@@ -44,19 +44,21 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.ProtoUtil;
+import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.BlockingService;
 import com.google.protobuf.Descriptors.MethodDescriptor;
 import com.google.protobuf.Message;
 import com.google.protobuf.ServiceException;
+import com.google.protobuf.TextFormat;
 
 /**
  * RPC Engine for for protobuf based RPCs.
  */
 @InterfaceStability.Evolving
 public class ProtobufRpcEngine implements RpcEngine {
-  private static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
+  public static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
   
   static { // Register the rpcRequest deserializer for WritableRpcEngine 
     org.apache.hadoop.ipc.Server.registerProtocolEngine(
@@ -185,21 +187,34 @@ public class ProtobufRpcEngine implement
         throws ServiceException {
       long startTime = 0;
       if (LOG.isDebugEnabled()) {
-        startTime = System.currentTimeMillis();
+        startTime = Time.now();
       }
 
       HadoopRpcRequestProto rpcRequest = constructRpcRequest(method, args);
       RpcResponseWritable val = null;
+      
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(Thread.currentThread().getId() + ": Call -> " +
+            remoteId + ": " + method.getName() +
+            " {" + TextFormat.shortDebugString((Message) args[1]) + "}");
+      }
       try {
         val = (RpcResponseWritable) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
             new RpcRequestWritable(rpcRequest), remoteId);
+
       } catch (Throwable e) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(Thread.currentThread().getId() + ": Exception <- " +
+              remoteId + ": " + method.getName() +
+                " {" + e + "}");
+        }
+
         throw new ServiceException(e);
       }
 
       if (LOG.isDebugEnabled()) {
-        long callTime = System.currentTimeMillis() - startTime;
-        LOG.debug("Call: " + method.getName() + " " + callTime);
+        long callTime = Time.now() - startTime;
+        LOG.debug("Call: " + method.getName() + " took " + callTime + "ms");
       }
       
       Message prototype = null;
@@ -212,12 +227,20 @@ public class ProtobufRpcEngine implement
       try {
         returnMessage = prototype.newBuilderForType()
             .mergeFrom(val.responseMessage).build();
+
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(Thread.currentThread().getId() + ": Response <- " +
+              remoteId + ": " + method.getName() +
+                " {" + TextFormat.shortDebugString(returnMessage) + "}");
+        }
+
       } catch (Throwable e) {
         throw new ServiceException(e);
       }
       return returnMessage;
     }
 
+    @Override
     public void close() throws IOException {
       if (!isClosed) {
         isClosed = true;
@@ -446,10 +469,10 @@ public class ProtobufRpcEngine implement
             .mergeFrom(rpcRequest.getRequest()).build();
         Message result;
         try {
-          long startTime = System.currentTimeMillis();
+          long startTime = Time.now();
           server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
           result = service.callBlockingMethod(methodDescriptor, null, param);
-          int processingTime = (int) (System.currentTimeMillis() - startTime);
+          int processingTime = (int) (Time.now() - startTime);
           int qTime = (int) (startTime - receiveTime);
           if (LOG.isDebugEnabled()) {
             LOG.info("Served: " + methodName + " queueTime= " + qTime +

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java Fri Oct 19 02:25:55 2012
@@ -19,7 +19,6 @@
 package org.apache.hadoop.ipc;
 
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.HashSet;
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java Fri Oct 19 02:25:55 2012
@@ -36,7 +36,8 @@ public class ProtocolSignature implement
     WritableFactories.setFactory
       (ProtocolSignature.class,
        new WritableFactory() {
-         public Writable newInstance() { return new ProtocolSignature(); }
+         @Override
+        public Writable newInstance() { return new ProtocolSignature(); }
        });
   }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java Fri Oct 19 02:25:55 2012
@@ -48,8 +48,11 @@ import org.apache.hadoop.security.SaslRp
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Time;
 
 import com.google.protobuf.BlockingService;
 
@@ -71,6 +74,8 @@ import com.google.protobuf.BlockingServi
  * All methods in the protocol should throw only IOException.  No field data of
  * the protocol instance is transmitted.
  */
+@InterfaceAudience.LimitedPrivate(value = { "Common", "HDFS", "MapReduce", "Yarn" })
+@InterfaceStability.Evolving
 public class RPC {
   public enum RpcKind {
     RPC_BUILTIN ((short) 1),         // Used for built in calls by tests
@@ -369,7 +374,7 @@ public class RPC {
                                int rpcTimeout,
                                RetryPolicy connectionRetryPolicy,
                                long timeout) throws IOException { 
-    long startTime = System.currentTimeMillis();
+    long startTime = Time.now();
     IOException ioe;
     while (true) {
       try {
@@ -387,7 +392,7 @@ public class RPC {
         ioe = nrthe;
       }
       // check if timed out
-      if (System.currentTimeMillis()-timeout >= startTime) {
+      if (Time.now()-timeout >= startTime) {
         throw ioe;
       }
 
@@ -628,7 +633,7 @@ public class RPC {
 
   /** Construct a server for a protocol implementation instance listening on a
    * port and address.
-   * @deprecated protocol interface should be passed.
+   * @deprecated Please use {@link Builder} to build the {@link Server}
    */
   @Deprecated
   public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf) 
@@ -638,7 +643,7 @@ public class RPC {
 
   /** Construct a server for a protocol implementation instance listening on a
    * port and address.
-   * @deprecated protocol interface should be passed.
+   * @deprecated Please use {@link Builder} to build the {@link Server}
    */
   @Deprecated
   public static Server getServer(final Object instance, final String bindAddress, final int port,
@@ -650,7 +655,10 @@ public class RPC {
                      null);
   }
 
-  /** Construct a server for a protocol implementation instance. */
+  /** Construct a server for a protocol implementation instance.
+   *  @deprecated Please use {@link Builder} to build the {@link Server}
+   */
+  @Deprecated
   public static Server getServer(Class<?> protocol,
                                  Object instance, String bindAddress,
                                  int port, Configuration conf) 
@@ -660,7 +668,7 @@ public class RPC {
   }
 
   /** Construct a server for a protocol implementation instance.
-   * @deprecated secretManager should be passed.
+   * @deprecated Please use {@link Builder} to build the {@link Server}
    */
   @Deprecated
   public static Server getServer(Class<?> protocol,
@@ -673,7 +681,10 @@ public class RPC {
                  conf, null, null);
   }
   
-  /** Construct a server for a protocol implementation instance. */
+  /** Construct a server for a protocol implementation instance. 
+   *  @deprecated Please use {@link Builder} to build the {@link Server}
+   */
+  @Deprecated
   public static Server getServer(Class<?> protocol,
                                  Object instance, String bindAddress, int port,
                                  int numHandlers,
@@ -684,6 +695,10 @@ public class RPC {
         conf, secretManager, null);
   }
   
+  /**
+   *  @deprecated Please use {@link Builder} to build the {@link Server}
+   */
+  @Deprecated
   public static Server getServer(Class<?> protocol,
       Object instance, String bindAddress, int port,
       int numHandlers,
@@ -696,8 +711,10 @@ public class RPC {
                  verbose, conf, secretManager, portRangeConfig);
   }
 
-  /** Construct a server for a protocol implementation instance. */
-
+  /** Construct a server for a protocol implementation instance.
+   *  @deprecated Please use {@link Builder} to build the {@link Server}
+   */
+  @Deprecated
   public static <PROTO extends VersionedProtocol, IMPL extends PROTO> 
         Server getServer(Class<PROTO> protocol,
                                  IMPL instance, String bindAddress, int port,
@@ -712,6 +729,110 @@ public class RPC {
                  null);
   }
 
+  /**
+   * Class to construct instances of RPC server with specific options.
+   */
+  public static class Builder {
+    private Class<?> protocol = null;
+    private Object instance = null;
+    private String bindAddress = "0.0.0.0";
+    private int port = 0;
+    private int numHandlers = 1;
+    private int numReaders = -1;
+    private int queueSizePerHandler = -1;
+    private boolean verbose = false;
+    private final Configuration conf;    
+    private SecretManager<? extends TokenIdentifier> secretManager = null;
+    private String portRangeConfig = null;
+    
+    public Builder(Configuration conf) {
+      this.conf = conf;
+    }
+
+    /** Mandatory field */
+    public Builder setProtocol(Class<?> protocol) {
+      this.protocol = protocol;
+      return this;
+    }
+    
+    /** Mandatory field */
+    public Builder setInstance(Object instance) {
+      this.instance = instance;
+      return this;
+    }
+    
+    /** Default: 0.0.0.0 */
+    public Builder setBindAddress(String bindAddress) {
+      this.bindAddress = bindAddress;
+      return this;
+    }
+    
+    /** Default: 0 */
+    public Builder setPort(int port) {
+      this.port = port;
+      return this;
+    }
+    
+    /** Default: 1 */
+    public Builder setNumHandlers(int numHandlers) {
+      this.numHandlers = numHandlers;
+      return this;
+    }
+    
+    /** Default: -1 */
+    public Builder setnumReaders(int numReaders) {
+      this.numReaders = numReaders;
+      return this;
+    }
+    
+    /** Default: -1 */
+    public Builder setQueueSizePerHandler(int queueSizePerHandler) {
+      this.queueSizePerHandler = queueSizePerHandler;
+      return this;
+    }
+    
+    /** Default: false */
+    public Builder setVerbose(boolean verbose) {
+      this.verbose = verbose;
+      return this;
+    }
+    
+    /** Default: null */
+    public Builder setSecretManager(
+        SecretManager<? extends TokenIdentifier> secretManager) {
+      this.secretManager = secretManager;
+      return this;
+    }
+    
+    /** Default: null */
+    public Builder setPortRangeConfig(String portRangeConfig) {
+      this.portRangeConfig = portRangeConfig;
+      return this;
+    }
+    
+    /**
+     * Build the RPC Server. 
+     * @throws IOException on error
+     * @throws HadoopIllegalArgumentException when mandatory fields are not set
+     */
+    public Server build() throws IOException, HadoopIllegalArgumentException {
+      if (this.conf == null) {
+        throw new HadoopIllegalArgumentException("conf is not set");
+      }
+      if (this.protocol == null) {
+        throw new HadoopIllegalArgumentException("protocol is not set");
+      }
+      if (this.instance == null) {
+        throw new HadoopIllegalArgumentException("instance is not set");
+      }
+      
+      return getProtocolEngine(this.protocol, this.conf).getServer(
+          this.protocol, this.instance, this.bindAddress, this.port,
+          this.numHandlers, this.numReaders, this.queueSizePerHandler,
+          this.verbose, this.conf, this.secretManager, this.portRangeConfig);
+    }
+  }
+  
   /** An RPC Server. */
   public abstract static class Server extends org.apache.hadoop.ipc.Server {
    boolean verbose;

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Fri Oct 19 02:25:55 2012
@@ -46,11 +46,13 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -62,6 +64,7 @@ import javax.security.sasl.SaslServer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration.IntegerRanges;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -84,7 +87,6 @@ import org.apache.hadoop.security.SaslRp
 import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
 import org.apache.hadoop.security.SaslRpcServer.SaslStatus;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.ProxyUsers;
@@ -95,6 +97,7 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.util.ProtoUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -104,9 +107,47 @@ import com.google.common.annotations.Vis
  * 
  * @see Client
  */
+@InterfaceAudience.LimitedPrivate(value = { "Common", "HDFS", "MapReduce", "Yarn" })
+@InterfaceStability.Evolving
 public abstract class Server {
   private final boolean authorize;
   private boolean isSecurityEnabled;
+  private ExceptionsHandler exceptionsHandler = new ExceptionsHandler();
+  
+  public void addTerseExceptions(Class<?>... exceptionClass) {
+    exceptionsHandler.addTerseExceptions(exceptionClass);
+  }
+
+  /**
+   * ExceptionsHandler manages Exception groups for special handling
+   * e.g., terse exception group for concise logging messages
+   */
+  static class ExceptionsHandler {
+    private volatile Set<String> terseExceptions = new HashSet<String>();
+
+    /**
+     * Add exception class so server won't log its stack trace.
+     * Modifying the terseException through this method is thread safe.
+     *
+     * @param exceptionClass exception classes 
+     */
+    void addTerseExceptions(Class<?>... exceptionClass) {
+
+      // Make a copy of terseException for performing modification
+      final HashSet<String> newSet = new HashSet<String>(terseExceptions);
+
+      // Add all class names into the HashSet
+      for (Class<?> name : exceptionClass) {
+        newSet.add(name.toString());
+      }
+      // Replace terseException set
+      terseExceptions = Collections.unmodifiableSet(newSet);
+    }
+
+    boolean isTerse(Class<?> t) {
+      return terseExceptions.contains(t.toString());
+    }
+  }
   
   /**
    * The first four bytes of Hadoop RPC connections
@@ -411,7 +452,7 @@ public abstract class Server {
       this.callId = id;
       this.rpcRequest = param;
       this.connection = connection;
-      this.timestamp = System.currentTimeMillis();
+      this.timestamp = Time.now();
       this.rpcResponse = null;
       this.rpcKind = kind;
     }
@@ -478,6 +519,7 @@ public abstract class Server {
         this.readSelector = Selector.open();
       }
       
+      @Override
       public void run() {
         LOG.info("Starting " + getName());
         try {
@@ -561,7 +603,7 @@ public abstract class Server {
      */
     private void cleanupConnections(boolean force) {
       if (force || numConnections > thresholdIdleConnections) {
-        long currentTime = System.currentTimeMillis();
+        long currentTime = Time.now();
         if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
           return;
         }
@@ -597,7 +639,7 @@ public abstract class Server {
           }
           else i++;
         }
-        lastCleanupRunTime = System.currentTimeMillis();
+        lastCleanupRunTime = Time.now();
       }
     }
 
@@ -682,7 +724,7 @@ public abstract class Server {
         try {
           reader.startAdd();
           SelectionKey readKey = reader.registerChannel(channel);
-          c = new Connection(readKey, channel, System.currentTimeMillis());
+          c = new Connection(readKey, channel, Time.now());
           readKey.attach(c);
           synchronized (connectionList) {
             connectionList.add(numConnections, c);
@@ -704,7 +746,7 @@ public abstract class Server {
       if (c == null) {
         return;  
       }
-      c.setLastContact(System.currentTimeMillis());
+      c.setLastContact(Time.now());
       
       try {
         count = c.readAndProcess();
@@ -726,7 +768,7 @@ public abstract class Server {
         c = null;
       }
       else {
-        c.setLastContact(System.currentTimeMillis());
+        c.setLastContact(Time.now());
       }
     }   
 
@@ -805,7 +847,7 @@ public abstract class Server {
               LOG.info(getName() + ": doAsyncWrite threw exception " + e);
             }
           }
-          long now = System.currentTimeMillis();
+          long now = Time.now();
           if (now < lastPurgeTime + PURGE_INTERVAL) {
             continue;
           }
@@ -951,7 +993,7 @@ public abstract class Server {
             
             if (inHandler) {
               // set the serve time when the response has to be sent later
-              call.timestamp = System.currentTimeMillis();
+              call.timestamp = Time.now();
               
               incPending();
               try {
@@ -1331,20 +1373,38 @@ public abstract class Server {
           dataLengthBuffer.clear();
           if (authMethod == null) {
             throw new IOException("Unable to read authentication method");
-          }
-          if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
-            AccessControlException ae = new AccessControlException("Authorization ("
-              + CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION
-              + ") is enabled but authentication ("
-              + CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION
-              + ") is configured as simple. Please configure another method "
-              + "like kerberos or digest.");
-            setupResponse(authFailedResponse, authFailedCall, RpcStatusProto.FATAL,
-                null, ae.getClass().getName(), ae.getMessage());
-            responder.doRespond(authFailedCall);
-            throw ae;
-          }
-          if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
+          }          
+          final boolean clientUsingSasl;
+          switch (authMethod) {
+            case SIMPLE: { // no sasl for simple
+              if (isSecurityEnabled) {
+                AccessControlException ae = new AccessControlException("Authorization ("
+                    + CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION
+                    + ") is enabled but authentication ("
+                    + CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION
+                    + ") is configured as simple. Please configure another method "
+                    + "like kerberos or digest.");
+                setupResponse(authFailedResponse, authFailedCall, RpcStatusProto.FATAL,
+                    null, ae.getClass().getName(), ae.getMessage());
+                responder.doRespond(authFailedCall);
+                throw ae;
+              }
+              clientUsingSasl = false;
+              useSasl = false; 
+              break;
+            }
+            case DIGEST: {
+              clientUsingSasl = true;
+              useSasl = (secretManager != null);
+              break;
+            }
+            default: {
+              clientUsingSasl = true;
+              useSasl = isSecurityEnabled; 
+              break;
+            }
+          }          
+          if (clientUsingSasl && !useSasl) {
             doSaslReply(SaslStatus.SUCCESS, new IntWritable(
                 SaslRpcServer.SWITCH_TO_SIMPLE_AUTH), null, null);
             authMethod = AuthMethod.SIMPLE;
@@ -1353,9 +1413,6 @@ public abstract class Server {
             // to simple auth from now on.
             skipInitialSaslHandshake = true;
           }
-          if (authMethod != AuthMethod.SIMPLE) {
-            useSasl = true;
-          }
           
           connectionHeaderBuf = null;
           connectionHeaderRead = true;
@@ -1489,8 +1546,6 @@ public abstract class Server {
             UserGroupInformation realUser = user;
             user = UserGroupInformation.createProxyUser(protocolUser
                 .getUserName(), realUser);
-            // Now the user is a proxy user, set Authentication method Proxy.
-            user.setAuthenticationMethod(AuthenticationMethod.PROXY);
           }
         }
       }
@@ -1642,7 +1697,7 @@ public abstract class Server {
       if (!channel.isOpen())
         return;
       try {socket.shutdownOutput();} catch(Exception e) {
-        LOG.warn("Ignoring socket shutdown exception");
+        LOG.debug("Ignoring socket shutdown exception", e);
       }
       if (channel.isOpen()) {
         try {channel.close();} catch(Exception e) {}
@@ -1703,8 +1758,8 @@ public abstract class Server {
               // on the server side, as opposed to just a normal exceptional
               // result.
               LOG.warn(logMsg, e);
-            } else if (e instanceof StandbyException) {
-              // Don't log the whole stack trace of these exceptions.
+            } else if (exceptionsHandler.isTerse(e.getClass())) {
+             // Don't log the whole stack trace of these exceptions.
               // Way too noisy!
               LOG.info(logMsg);
             } else {
@@ -1840,9 +1895,11 @@ public abstract class Server {
     // Create the responder here
     responder = new Responder();
     
-    if (isSecurityEnabled) {
+    if (secretManager != null) {
       SaslRpcServer.init(conf);
     }
+    
+    this.exceptionsHandler.addTerseExceptions(StandbyException.class);
   }
 
   private void closeConnection(Connection connection) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java Fri Oct 19 02:25:55 2012
@@ -37,6 +37,7 @@ import org.apache.hadoop.ipc.VersionedPr
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.*;
@@ -141,6 +142,7 @@ public class WritableRpcEngine implement
       return rpcVersion;
     }
 
+    @Override
     @SuppressWarnings("deprecation")
     public void readFields(DataInput in) throws IOException {
       rpcVersion = in.readLong();
@@ -158,6 +160,7 @@ public class WritableRpcEngine implement
       }
     }
 
+    @Override
     @SuppressWarnings("deprecation")
     public void write(DataOutput out) throws IOException {
       out.writeLong(rpcVersion);
@@ -172,6 +175,7 @@ public class WritableRpcEngine implement
       }
     }
 
+    @Override
     public String toString() {
       StringBuilder buffer = new StringBuilder();
       buffer.append(methodName);
@@ -188,10 +192,12 @@ public class WritableRpcEngine implement
       return buffer.toString();
     }
 
+    @Override
     public void setConf(Configuration conf) {
       this.conf = conf;
     }
 
+    @Override
     public Configuration getConf() {
       return this.conf;
     }
@@ -214,23 +220,25 @@ public class WritableRpcEngine implement
       this.client = CLIENTS.getClient(conf, factory);
     }
 
+    @Override
     public Object invoke(Object proxy, Method method, Object[] args)
       throws Throwable {
       long startTime = 0;
       if (LOG.isDebugEnabled()) {
-        startTime = System.currentTimeMillis();
+        startTime = Time.now();
       }
 
       ObjectWritable value = (ObjectWritable)
         client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId);
       if (LOG.isDebugEnabled()) {
-        long callTime = System.currentTimeMillis() - startTime;
+        long callTime = Time.now() - startTime;
         LOG.debug("Call: " + method.getName() + " " + callTime);
       }
       return value.get();
     }
     
     /* close the IPC client that's responsible for this invoker's RPCs */ 
+    @Override
     synchronized public void close() {
       if (!isClosed) {
         isClosed = true;
@@ -464,7 +472,7 @@ public class WritableRpcEngine implement
 
           // Invoke the protocol method
 
-          long startTime = System.currentTimeMillis();
+          long startTime = Time.now();
           Method method = 
               protocolImpl.protocolClass.getMethod(call.getMethodName(),
               call.getParameterClasses());
@@ -472,7 +480,7 @@ public class WritableRpcEngine implement
           server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
           Object value = 
               method.invoke(protocolImpl.protocolImpl, call.getParameters());
-          int processingTime = (int) (System.currentTimeMillis() - startTime);
+          int processingTime = (int) (Time.now() - startTime);
           int qTime = (int) (startTime-receivedTime);
           if (LOG.isDebugEnabled()) {
             LOG.debug("Served: " + call.getMethodName() +

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/jmx/JMXJsonServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/jmx/JMXJsonServlet.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/jmx/JMXJsonServlet.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/jmx/JMXJsonServlet.java Fri Oct 19 02:25:55 2012
@@ -113,12 +113,17 @@ import org.codehaus.jackson.JsonGenerato
  *  All other objects will be converted to a string and output as such.
  *  
  *  The bean's name and modelerType will be returned for all beans.
+ *
+ *  Optional paramater "callback" should be used to deliver JSONP response.
+ *  
  */
 public class JMXJsonServlet extends HttpServlet {
   private static final Log LOG = LogFactory.getLog(JMXJsonServlet.class);
 
   private static final long serialVersionUID = 1L;
 
+  private static final String CALLBACK_PARAM = "callback";
+
   /**
    * MBean server.
    */
@@ -154,11 +159,22 @@ public class JMXJsonServlet extends Http
         return;
       }
       JsonGenerator jg = null;
+      String jsonpcb = null;
+      PrintWriter writer = null;
       try {
-        response.setContentType("application/json; charset=utf8");
+        writer = response.getWriter();
+ 
+        // "callback" parameter implies JSONP outpout
+        jsonpcb = request.getParameter(CALLBACK_PARAM);
+        if (jsonpcb != null) {
+          response.setContentType("application/javascript; charset=utf8");
+          writer.write(jsonpcb + "(");
+        } else {
+          response.setContentType("application/json; charset=utf8");
+        }
 
-        PrintWriter writer = response.getWriter();
         jg = jsonFactory.createJsonGenerator(writer);
+        jg.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
         jg.useDefaultPrettyPrinter();
         jg.writeStartObject();
 
@@ -188,6 +204,12 @@ public class JMXJsonServlet extends Http
         if (jg != null) {
           jg.close();
         }
+        if (jsonpcb != null) {
+           writer.write(");");
+        }
+        if (writer != null) {
+          writer.close();
+        }
       }
     } catch (IOException e) {
       LOG.error("Caught an exception while processing JMX request", e);

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogLevel.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogLevel.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogLevel.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogLevel.java Fri Oct 19 02:25:55 2012
@@ -88,6 +88,7 @@ public class LogLevel {
   public static class Servlet extends HttpServlet {
     private static final long serialVersionUID = 1L;
 
+    @Override
     public void doGet(HttpServletRequest request, HttpServletResponse response
         ) throws ServletException, IOException {
 



Mime
View raw message