hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r897029 [2/4] - in /hadoop/common/branches/HADOOP-6194: ./ bin/ src/contrib/cloud/ src/contrib/cloud/lib/ src/contrib/cloud/src/ src/contrib/cloud/src/integration-test/ src/contrib/cloud/src/py/ src/contrib/cloud/src/py/hadoop/ src/contrib/...
Date Thu, 07 Jan 2010 22:04:43 GMT
Modified: hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/fs/DF.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/fs/DF.java?rev=897029&r1=897028&r2=897029&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/fs/DF.java (original)
+++ hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/fs/DF.java Thu Jan  7 22:04:37 2010
@@ -28,17 +28,17 @@
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.util.Shell;
 
-/** Filesystem disk space usage statistics.  Uses the unix 'df' program.
- * Tested on Linux, FreeBSD, Cygwin. */
+/** Filesystem disk space usage statistics.
+ * Uses the unix 'df' program to get mount points, and java.io.File for
+ * space utilization. Tested on Linux, FreeBSD, Cygwin. */
 public class DF extends Shell {
-  public static final long DF_INTERVAL_DEFAULT = 3 * 1000; // default DF refresh interval 
-  
-  private String dirPath;
+
+  /** Default DF refresh interval. */
+  public static final long DF_INTERVAL_DEFAULT = 3 * 1000;
+
+  private final String dirPath;
+  private final File dirFile;
   private String filesystem;
-  private long capacity;
-  private long used;
-  private long available;
-  private int percentUsed;
   private String mount;
 
   enum OSType {
@@ -79,6 +79,7 @@
   public DF(File path, long dfInterval) throws IOException {
     super(dfInterval);
     this.dirPath = path.getCanonicalPath();
+    this.dirFile = new File(this.dirPath);
   }
 
   protected OSType getOSType() {
@@ -87,35 +88,40 @@
   
   /// ACCESSORS
 
+  /** @return the canonical path to the volume we're checking. */
   public String getDirPath() {
     return dirPath;
   }
-  
-  public String getFilesystem() throws IOException { 
-    run(); 
-    return filesystem; 
+
+  /** @return a string indicating which filesystem volume we're checking. */
+  public String getFilesystem() throws IOException {
+    run();
+    return filesystem;
   }
-  
-  public long getCapacity() throws IOException { 
-    run(); 
-    return capacity; 
+
+  /** @return the capacity of the measured filesystem in bytes. */
+  public long getCapacity() {
+    return dirFile.getTotalSpace();
   }
-  
-  public long getUsed() throws IOException { 
-    run(); 
-    return used;
+
+  /** @return the total used space on the filesystem in bytes. */
+  public long getUsed() {
+    return dirFile.getTotalSpace() - dirFile.getFreeSpace();
   }
-  
-  public long getAvailable() throws IOException { 
-    run(); 
-    return available;
+
+  /** @return the usable space remaining on the filesystem in bytes. */
+  public long getAvailable() {
+    return dirFile.getUsableSpace();
   }
-  
-  public int getPercentUsed() throws IOException {
-    run();
-    return percentUsed;
+
+  /** @return the amount of the volume full, as a percent. */
+  public int getPercentUsed() {
+    double cap = (double) getCapacity();
+    double used = (cap - (double) getAvailable());
+    return (int) (used * 100.0 / cap);
   }
 
+  /** @return the filesystem mount point for the indicated volume */
   public String getMount() throws IOException {
     run();
     return mount;
@@ -125,10 +131,10 @@
     return
       "df -k " + mount +"\n" +
       filesystem + "\t" +
-      capacity / 1024 + "\t" +
-      used / 1024 + "\t" +
-      available / 1024 + "\t" +
-      percentUsed + "%\t" +
+      getCapacity() / 1024 + "\t" +
+      getUsed() / 1024 + "\t" +
+      getAvailable() / 1024 + "\t" +
+      getPercentUsed() + "%\t" +
       mount;
   }
 
@@ -161,13 +167,12 @@
 
     switch(getOSType()) {
       case OS_TYPE_AIX:
-        this.capacity = Long.parseLong(tokens.nextToken()) * 1024;
-        this.available = Long.parseLong(tokens.nextToken()) * 1024;
-        this.percentUsed = Integer.parseInt(tokens.nextToken());
+        Long.parseLong(tokens.nextToken()); // capacity
+        Long.parseLong(tokens.nextToken()); // available
+        Integer.parseInt(tokens.nextToken()); // pct used
         tokens.nextToken();
         tokens.nextToken();
         this.mount = tokens.nextToken();
-        this.used = this.capacity - this.available;
         break;
 
       case OS_TYPE_WIN:
@@ -175,10 +180,10 @@
       case OS_TYPE_MAC:
       case OS_TYPE_UNIX:
       default:
-        this.capacity = Long.parseLong(tokens.nextToken()) * 1024;
-        this.used = Long.parseLong(tokens.nextToken()) * 1024;
-        this.available = Long.parseLong(tokens.nextToken()) * 1024;
-        this.percentUsed = Integer.parseInt(tokens.nextToken());
+        Long.parseLong(tokens.nextToken()); // capacity
+        Long.parseLong(tokens.nextToken()); // used
+        Long.parseLong(tokens.nextToken()); // available
+        Integer.parseInt(tokens.nextToken()); // pct used
         this.mount = tokens.nextToken();
         break;
    }

Modified: hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/fs/FSInputChecker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/fs/FSInputChecker.java?rev=897029&r1=897028&r2=897029&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/fs/FSInputChecker.java (original)
+++ hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/fs/FSInputChecker.java Thu Jan  7 22:04:37 2010
@@ -24,6 +24,8 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.StringUtils;
+import java.nio.ByteBuffer;
+import java.nio.IntBuffer;
 
 /**
  * This is a generic input stream for verifying checksums for
@@ -38,16 +40,26 @@
   protected Path file;
   private Checksum sum;
   private boolean verifyChecksum = true;
-  private byte[] buf;
+  private int maxChunkSize; // data bytes for checksum (eg 512)
+  private byte[] buf; // buffer for non-chunk-aligned reading
   private byte[] checksum;
-  private int pos;
-  private int count;
+  private IntBuffer checksumInts; // wrapper on checksum buffer
+  private int pos; // the position of the reader inside buf
+  private int count; // the number of bytes currently in buf
   
   private int numOfRetries;
   
   // cached file position
+  // this should always be a multiple of maxChunkSize
   private long chunkPos = 0;
-  
+
+  // Number of checksum chunks that can be read at once into a user
+  // buffer. Chosen by benchmarks - higher values do not reduce
+  // CPU usage. The size of the data reads made to the underlying stream
+  // will be CHUNKS_PER_READ * maxChunkSize.
+  private static final int CHUNKS_PER_READ = 32;
+  protected static final int CHECKSUM_SIZE = 4; // 32-bit checksum
+
   /** Constructor
    * 
    * @param file The name of the file to be read
@@ -72,14 +84,34 @@
     set(verifyChecksum, sum, chunkSize, checksumSize);
   }
   
-  /** Reads in next checksum chunk data into <code>buf</code> at <code>offset</code>
+  /**
+   * Reads in checksum chunks into <code>buf</code> at <code>offset</code>
    * and checksum into <code>checksum</code>.
+   * Since checksums can be disabled, there are two cases implementors need
+   * to worry about:
+   *
+   *  (a) needChecksum() will return false:
+   *     - len can be any positive value
+   *     - checksum will be null
+   *     Implementors should simply pass through to the underlying data stream.
+   * or
+   *  (b) needChecksum() will return true:
+   *    - len >= maxChunkSize
+   *    - checksum.length is a multiple of CHECKSUM_SIZE
+   *    Implementors should read an integer number of data chunks into
+   *    buf. The amount read should be bounded by len or by 
+   *    checksum.length / CHECKSUM_SIZE * maxChunkSize. Note that len may
+   *    be a value that is not a multiple of maxChunkSize, in which case
+   *    the implementation may return less than len.
+   *
    * The method is used for implementing read, therefore, it should be optimized
-   * for sequential reading
+   * for sequential reading.
+   *
    * @param pos chunkPos
    * @param buf desitination buffer
    * @param offset offset in buf at which to store data
-   * @param len maximun number of bytes to read
+   * @param len maximum number of bytes to read
+   * @param checksum the data buffer into which to write checksums
    * @return number of bytes read
    */
   abstract protected int readChunk(long pos, byte[] buf, int offset, int len,
@@ -96,7 +128,7 @@
   protected synchronized boolean needChecksum() {
     return verifyChecksum && sum != null;
   }
-  
+
   /**
    * Read one checksum-verified byte
    * 
@@ -173,7 +205,7 @@
   private void fill(  ) throws IOException {
     assert(pos>=count);
     // fill internal buffer
-    count = readChecksumChunk(buf, 0, buf.length);
+    count = readChecksumChunk(buf, 0, maxChunkSize);
     if (count < 0) count = 0;
   }
   
@@ -185,13 +217,13 @@
   throws IOException {
     int avail = count-pos;
     if( avail <= 0 ) {
-      if(len>=buf.length) {
+      if(len >= maxChunkSize) {
         // read a chunk to user buffer directly; avoid one copy
         int nread = readChecksumChunk(b, off, len);
         return nread;
       } else {
         // read a chunk into the local buffer
-        fill();
+         fill();
         if( count <= 0 ) {
           return -1;
         } else {
@@ -207,10 +239,10 @@
     return cnt;    
   }
   
-  /* Read up one checksum chunk to array <i>b</i> at pos <i>off</i>
-   * It requires a checksum chunk boundary
+  /* Read up one or more checksum chunk to array <i>b</i> at pos <i>off</i>
+   * It requires at least one checksum chunk boundary
    * in between <cur_pos, cur_pos+len> 
-   * and it stops reading at the boundary or at the end of the stream;
+   * and it stops reading at the last boundary or at the end of the stream;
    * Otherwise an IllegalArgumentException is thrown.
    * This makes sure that all data read are checksum verified.
    * 
@@ -223,7 +255,7 @@
    *            the stream has been reached.
    * @throws IOException if an I/O error occurs.
    */ 
-  private int readChecksumChunk(byte b[], int off, int len)
+  private int readChecksumChunk(byte b[], final int off, final int len)
   throws IOException {
     // invalidate buffer
     count = pos = 0;
@@ -236,13 +268,12 @@
 
       try {
         read = readChunk(chunkPos, b, off, len, checksum);
-        if( read > 0 ) {
+        if( read > 0) {
           if( needChecksum() ) {
-            sum.update(b, off, read);
-            verifySum(chunkPos);
+            verifySums(b, off, read);
           }
           chunkPos += read;
-        } 
+        }
         retry = false;
       } catch (ChecksumException ce) {
           LOG.info("Found checksum error: b[" + off + ", " + (off+read) + "]="
@@ -266,26 +297,38 @@
     } while (retry);
     return read;
   }
-  
-  /* verify checksum for the chunk.
-   * @throws ChecksumException if there is a mismatch
-   */
-  private void verifySum(long errPos) throws ChecksumException {
-    long crc = getChecksum();
-    long sumValue = sum.getValue();
-    sum.reset();
-    if (crc != sumValue) {
-      throw new ChecksumException(
-          "Checksum error: "+file+" at "+errPos, errPos);
+
+  private void verifySums(final byte b[], final int off, int read)
+    throws ChecksumException
+  {
+    int leftToVerify = read;
+    int verifyOff = 0;
+    checksumInts.rewind();
+    checksumInts.limit((read - 1)/maxChunkSize + 1);
+
+    while (leftToVerify > 0) {
+      sum.update(b, off + verifyOff, Math.min(leftToVerify, maxChunkSize));
+      int expected = checksumInts.get();
+      int calculated = (int)sum.getValue();
+      sum.reset();
+
+      if (expected != calculated) {
+        long errPos = chunkPos + verifyOff;
+        throw new ChecksumException(
+          "Checksum error: "+file+" at "+ errPos +
+          " exp: " + expected + " got: " + calculated, errPos);
+      }
+      leftToVerify -= maxChunkSize;
+      verifyOff += maxChunkSize;
     }
   }
-  
-  /* calculate checksum value */
-  private long getChecksum() {
-    return checksum2long(checksum);
-  }
 
-  /** Convert a checksum byte array to a long */
+  /**
+   * Convert a checksum byte array to a long
+   * This is deprecated since 0.22 since it is no longer in use
+   * by this class.
+   */
+  @Deprecated
   static public long checksum2long(byte[] checksum) {
     long crc = 0L;
     for(int i=0; i<checksum.length; i++) {
@@ -293,7 +336,7 @@
     }
     return crc;
   }
-  
+
   @Override
   public synchronized long getPos() throws IOException {
     return chunkPos-Math.max(0L, count - pos);
@@ -399,11 +442,19 @@
    * @param checksumSize checksum size
    */
   final protected synchronized void set(boolean verifyChecksum,
-      Checksum sum, int maxChunkSize, int checksumSize ) {
+      Checksum sum, int maxChunkSize, int checksumSize) {
+
+    // The code makes assumptions that checksums are always 32-bit.
+    assert !verifyChecksum || sum == null || checksumSize == CHECKSUM_SIZE;
+
+    this.maxChunkSize = maxChunkSize;
     this.verifyChecksum = verifyChecksum;
     this.sum = sum;
     this.buf = new byte[maxChunkSize];
-    this.checksum = new byte[checksumSize];
+    // The size of the checksum array here determines how much we can
+    // read in a single call to readChunk
+    this.checksum = new byte[CHUNKS_PER_READ * checksumSize];
+    this.checksumInts = ByteBuffer.wrap(checksum).asIntBuffer();
     this.count = 0;
     this.pos = 0;
   }

Modified: hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/fs/FsShell.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/fs/FsShell.java?rev=897029&r1=897028&r2=897029&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/fs/FsShell.java (original)
+++ hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/fs/FsShell.java Thu Jan  7 22:04:37 2010
@@ -1505,6 +1505,8 @@
     
     String chgrp = FsShellPermissions.CHGRP_USAGE + "\n" +
       "\t\tThis is equivalent to -chown ... :GROUP ...\n";
+
+    String expunge = "-expunge: Empty the Trash.\n";
     
     String help = "-help [cmd]: \tDisplays help for given command or all commands if none\n" +
       "\t\tis specified.\n";
@@ -1527,6 +1529,8 @@
       System.out.println(dus);
     } else if ("rm".equals(cmd)) {
       System.out.println(rm);
+    } else if ("expunge".equals(cmd)) {
+      System.out.println(expunge);
     } else if ("rmr".equals(cmd)) {
       System.out.println(rmr);
     } else if ("mkdir".equals(cmd)) {

Modified: hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java?rev=897029&r1=897028&r2=897029&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java (original)
+++ hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java Thu Jan  7 22:04:37 2010
@@ -86,20 +86,31 @@
   static final String PATH_DELIMITER = Path.SEPARATOR;
   private static final int S3_MAX_LISTING_LENGTH = 1000;
   
-  private class NativeS3FsInputStream extends FSInputStream {
+  static class NativeS3FsInputStream extends FSInputStream {
     
+    private NativeFileSystemStore store;
+    private Statistics statistics;
     private InputStream in;
     private final String key;
     private long pos = 0;
     
-    public NativeS3FsInputStream(InputStream in, String key) {
+    public NativeS3FsInputStream(NativeFileSystemStore store, Statistics statistics, InputStream in, String key) {
+      this.store = store;
+      this.statistics = statistics;
       this.in = in;
       this.key = key;
     }
     
     @Override
     public synchronized int read() throws IOException {
-      int result = in.read();
+      int result = -1;
+      try {
+        result = in.read();
+      } catch (IOException e) {
+        LOG.info("Received IOException while reading '" + key + "', attempting to reopen.");
+        seek(pos);
+        result = in.read();
+      } 
       if (result != -1) {
         pos++;
       }
@@ -112,7 +123,14 @@
     public synchronized int read(byte[] b, int off, int len)
       throws IOException {
       
-      int result = in.read(b, off, len);
+      int result = -1;
+      try {
+        result = in.read(b, off, len);
+      } catch (IOException e) {
+        LOG.info("Received IOException while reading '" + key + "', attempting to reopen.");
+        seek(pos);
+        result = in.read(b, off, len);
+      }
       if (result > 0) {
         pos += result;
       }
@@ -514,7 +532,7 @@
     Path absolutePath = makeAbsolute(f);
     String key = pathToKey(absolutePath);
     return new FSDataInputStream(new BufferedFSInputStream(
-        new NativeS3FsInputStream(store.retrieve(key), key), bufferSize));
+        new NativeS3FsInputStream(store, statistics, store.retrieve(key), key), bufferSize));
   }
   
   // rename() and delete() use this method to ensure that the parent directory

Modified: hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/http/HttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/http/HttpServer.java?rev=897029&r1=897028&r2=897029&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/http/HttpServer.java (original)
+++ hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/http/HttpServer.java Thu Jan  7 22:04:37 2010
@@ -27,6 +27,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 
 import javax.servlet.Filter;
 import javax.servlet.FilterChain;
@@ -45,6 +46,7 @@
 import org.apache.hadoop.log.LogLevel;
 import org.apache.hadoop.metrics.MetricsServlet;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.conf.ConfServlet;
 
 import org.mortbay.jetty.Connector;
 import org.mortbay.jetty.Handler;
@@ -76,6 +78,10 @@
   static final String FILTER_INITIALIZER_PROPERTY
       = "hadoop.http.filter.initializers";
 
+  // The ServletContext attribute where the daemon Configuration
+  // gets stored.
+  public static final String CONF_CONTEXT_ATTRIBUTE = "hadoop.conf";
+
   protected final Server webServer;
   protected final Connector listener;
   protected final WebAppContext webAppContext;
@@ -84,6 +90,8 @@
       new HashMap<Context, Boolean>();
   protected final List<String> filterNames = new ArrayList<String>();
   private static final int MAX_RETRIES = 10;
+  static final String STATE_DESCRIPTION_ALIVE = " - alive";
+  static final String STATE_DESCRIPTION_NOT_LIVE = " - not live";
 
   /** Same as this(name, bindAddress, port, findPort, null); */
   public HttpServer(String name, String bindAddress, int port, boolean findPort
@@ -119,6 +127,7 @@
     webAppContext = new WebAppContext();
     webAppContext.setContextPath("/");
     webAppContext.setWar(appDir + "/" + name);
+    webAppContext.getServletContext().setAttribute(CONF_CONTEXT_ATTRIBUTE, conf);
     webServer.addHandler(webAppContext);
 
     addDefaultApps(contexts, appDir);
@@ -197,6 +206,7 @@
     addServlet("stacks", "/stacks", StackServlet.class);
     addServlet("logLevel", "/logLevel", LogLevel.Servlet.class);
     addServlet("metrics", "/metrics", MetricsServlet.class);
+    addServlet("conf", "/conf", ConfServlet.class);
   }
 
   public void addContext(Context ctxt, boolean isFiltered)
@@ -469,6 +479,32 @@
           } //Workaround end
           LOG.info("Jetty bound to port " + port);
           webServer.start();
+          // Workaround for HADOOP-6386
+          port = listener.getLocalPort();
+          if (port < 0) {
+            LOG.warn("Bounds port is " + port + " after webserver start");
+            for (int i = 0; i < MAX_RETRIES/2; i++) {
+              try {
+                webServer.stop();
+              } catch (Exception e) {
+                LOG.warn("Can't stop  web-server", e);
+              }
+              Thread.sleep(1000);
+              
+              listener.setPort(oriPort == 0 ? 0 : (oriPort += 1));
+              listener.open();
+              Thread.sleep(100);
+              webServer.start();
+              LOG.info(i + "attempts to restart webserver");
+              port = listener.getLocalPort();
+              if (port > 0)
+                break;
+            }
+            if (port < 0)
+              throw new Exception("listener.getLocalPort() is returning " +
+                		"less than 0 even after " +MAX_RETRIES+" resets");
+          }
+          // End of HADOOP-6386 workaround
           break;
         } catch (IOException ex) {
           // if this is a bind exception,
@@ -515,7 +551,7 @@
    * @return true if the web server is started, false otherwise
    */
   public boolean isAlive() {
-    return webServer.isStarted();
+    return webServer != null && webServer.isStarted();
   }
 
   /**
@@ -525,7 +561,8 @@
   @Override
   public String toString() {
     return listener != null ?
-        ("HttpServer at http://" + listener.getHost() + ":" + listener.getLocalPort() + "/")
+        ("HttpServer at http://" + listener.getHost() + ":" + listener.getLocalPort() + "/"
+            + (isAlive() ? STATE_DESCRIPTION_ALIVE : STATE_DESCRIPTION_NOT_LIVE))
         : "Inactive HttpServer";
   }
 
@@ -620,6 +657,25 @@
         }
         return result;
       }
+      
+      /**
+       * Quote the url so that users specifying the HOST HTTP header
+       * can't inject attacks.
+       */
+      @Override
+      public StringBuffer getRequestURL(){
+        String url = rawRequest.getRequestURL().toString();
+        return new StringBuffer(HtmlQuoting.quoteHtmlChars(url));
+      }
+      
+      /**
+       * Quote the server name so that users specifying the HOST HTTP header
+       * can't inject attacks.
+       */
+      @Override
+      public String getServerName() {
+        return HtmlQuoting.quoteHtmlChars(rawRequest.getServerName());
+      }
     }
 
     @Override
@@ -637,6 +693,10 @@
                          ) throws IOException, ServletException {
       HttpServletRequestWrapper quoted = 
         new RequestQuoter((HttpServletRequest) request);
+      final HttpServletResponse httpResponse = (HttpServletResponse) response;
+      // set the default to UTF-8 so that we don't need to worry about IE7
+      // choosing to interpret the special characters as UTF-7
+      httpResponse.setContentType("text/html;charset=utf-8");
       chain.doFilter(quoted, response);
     }
 

Modified: hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/JavaSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/JavaSerialization.java?rev=897029&r1=897028&r2=897029&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/JavaSerialization.java (original)
+++ hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/JavaSerialization.java Thu Jan  7 22:04:37 2010
@@ -24,6 +24,9 @@
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.hadoop.io.RawComparator;
 
 /**
  * <p>
@@ -31,10 +34,10 @@
  * </p>
  * @see JavaSerializationComparator
  */
-public class JavaSerialization implements Serialization<Serializable> {
-  
+public class JavaSerialization extends SerializationBase<Serializable> {
+
   static class JavaSerializationDeserializer<T extends Serializable>
-    implements Deserializer<T> {
+    extends DeserializerBase<T> {
 
     private ObjectInputStream ois;
 
@@ -61,11 +64,16 @@
     }
 
   }
-  
-  static class JavaSerializationSerializer
-    implements Serializer<Serializable> {
+
+  static class JavaSerializationSerializer<T extends Serializable>
+      extends SerializerBase<T> {
 
     private ObjectOutputStream oos;
+    private Map<String, String> metadata;
+
+    public JavaSerializationSerializer(Map<String, String> metadata) {
+      this.metadata = metadata;
+    }
 
     public void open(OutputStream out) throws IOException {
       oos = new ObjectOutputStream(out) {
@@ -75,7 +83,7 @@
       };
     }
 
-    public void serialize(Serializable object) throws IOException {
+    public void serialize(T object) throws IOException {
       oos.reset(); // clear (class) back-references
       oos.writeObject(object);
     }
@@ -84,18 +92,53 @@
       oos.close();
     }
 
+    @Override
+    public Map<String, String> getMetadata() throws IOException {
+      return metadata;
+    }
   }
 
-  public boolean accept(Class<?> c) {
+  public boolean accept(Map<String, String> metadata) {
+    if (!checkSerializationKey(metadata)) {
+      return false;
+    }
+
+    Class<?> c = getClassFromMetadata(metadata);
     return Serializable.class.isAssignableFrom(c);
   }
 
-  public Deserializer<Serializable> getDeserializer(Class<Serializable> c) {
+  public DeserializerBase<Serializable> getDeserializer(
+      Map<String, String> metadata) {
     return new JavaSerializationDeserializer<Serializable>();
   }
 
-  public Serializer<Serializable> getSerializer(Class<Serializable> c) {
-    return new JavaSerializationSerializer();
+  public SerializerBase<Serializable> getSerializer(
+      Map<String, String> metadata) {
+    return new JavaSerializationSerializer<Serializable>(metadata);
   }
 
+  @SuppressWarnings("unchecked")
+  @Override
+  public RawComparator<Serializable> getRawComparator(
+      Map<String, String> metadata) {
+    Class<?> klazz = getClassFromMetadata(metadata);
+    if (null == klazz) {
+      throw new IllegalArgumentException(
+          "Cannot get comparator without " + SerializationBase.CLASS_KEY
+          + " set in metadata");
+    }
+
+    if (Serializable.class.isAssignableFrom(klazz)) {
+      try {
+        return (RawComparator<Serializable>) new JavaSerializationComparator();
+      } catch (IOException ioe) {
+        throw new IllegalArgumentException(
+            "Could not instantiate JavaSerializationComparator for type "
+            + klazz.getName(), ioe);
+      }
+    } else {
+      throw new IllegalArgumentException("Class " + klazz.getName()
+          + " is incompatible with JavaSerialization");
+    }
+  }
 }

Modified: hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/LegacySerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/LegacySerialization.java?rev=897029&r1=897028&r2=897029&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/LegacySerialization.java (original)
+++ hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/LegacySerialization.java Thu Jan  7 22:04:37 2010
@@ -21,6 +21,7 @@
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.RawComparator;
 
 /**
  * <p>
@@ -82,4 +83,14 @@
     return new LegacyDeserializer<T>(getDeserializer(c));
   }
 
+  @Override
+  public RawComparator<T> getRawComparator(Map<String, String> metadata) {
+    // Since this method is being added to an API meant to provide legacy
+    // compatability with deprecated serializers, leaving this as an incomplete
+    // stub.
+
+    throw new UnsupportedOperationException(
+        "LegacySerialization does not provide raw comparators");
+  }
+
 }

Modified: hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/SerializationBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/SerializationBase.java?rev=897029&r1=897028&r2=897029&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/SerializationBase.java (original)
+++ hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/SerializationBase.java Thu Jan  7 22:04:37 2010
@@ -22,6 +22,7 @@
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.RawComparator;
 
 /**
  * <p>
@@ -88,4 +89,29 @@
       throw new IllegalArgumentException(e);
     }
   }
+
+  /** Provide a raw comparator for the specified serializable class.
+   * Requires a serialization-specific metadata entry to name the class
+   * to compare (e.g., "Serialized-Class" for JavaSerialization and
+   * WritableSerialization).
+   * @param metadata a set of string mappings providing serialization-specific
+   * arguments that parameterize the data being serialized/compared.
+   * @return a {@link RawComparator} for the given metadata.
+   * @throws UnsupportedOperationException if it cannot instantiate a RawComparator
+   * for this given metadata.
+   */
+  public abstract RawComparator<T> getRawComparator(Map<String,String> metadata);
+
+  /**
+   * Check that the SERIALIZATION_KEY, if set, matches the current class.
+   * @param metadata the serialization metadata to check.
+   * @return true if SERIALIZATION_KEY is unset, or if it matches the current class
+   * (meaning that accept() should continue processing), or false if it is a mismatch,
+   * meaning that accept() should return false.
+   */
+  protected boolean checkSerializationKey(Map<String, String> metadata) {
+    String intendedSerializer = metadata.get(SERIALIZATION_KEY);
+    return intendedSerializer == null ||
+        getClass().getName().equals(intendedSerializer);
+  }
 }

Modified: hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/WritableSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/WritableSerialization.java?rev=897029&r1=897028&r2=897029&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/WritableSerialization.java (original)
+++ hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/WritableSerialization.java Thu Jan  7 22:04:37 2010
@@ -26,8 +26,12 @@
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * A {@link SerializationBase} for {@link Writable}s that delegates to
@@ -35,7 +39,6 @@
  * {@link Writable#readFields(java.io.DataInput)}.
  */
 public class WritableSerialization extends SerializationBase<Writable> {
-  
   static class WritableDeserializer extends DeserializerBase<Writable> {
 
     private Class<?> writableClass;
@@ -79,9 +82,25 @@
     
     private Map<String, String> metadata;
     private DataOutputStream dataOut;
+    private Class<?> serializedClass;
     
-    public WritableSerializer(Map<String, String> metadata) {
+    public WritableSerializer(Configuration conf,
+        Map<String, String> metadata) {
       this.metadata = metadata;
+
+      // If this metadata specifies a serialized class, memoize the
+      // class object for this.
+      String className = this.metadata.get(CLASS_KEY);
+      if (null != className) {
+        try {
+          this.serializedClass = conf.getClassByName(className);
+        } catch (ClassNotFoundException cnfe) {
+          throw new RuntimeException(cnfe);
+        }
+      } else {
+        throw new UnsupportedOperationException("the "
+            + CLASS_KEY + " metadata is missing, but is required.");
+      }
     }
     
     @Override
@@ -95,6 +114,10 @@
 
     @Override
     public void serialize(Writable w) throws IOException {
+      if (serializedClass != w.getClass()) {
+        throw new IOException("Type mismatch in serialization: expected "
+            + serializedClass + "; received " + w.getClass());
+      }
       w.write(dataOut);
     }
 
@@ -112,16 +135,17 @@
 
   @Override
   public boolean accept(Map<String, String> metadata) {
-    if (getClass().getName().equals(metadata.get(SERIALIZATION_KEY))) {
-      return true;
+    if (!checkSerializationKey(metadata)) {
+      return false;
     }
+
     Class<?> c = getClassFromMetadata(metadata);
     return c == null ? false : Writable.class.isAssignableFrom(c);
   }
 
   @Override
   public SerializerBase<Writable> getSerializer(Map<String, String> metadata) {
-    return new WritableSerializer(metadata);
+    return new WritableSerializer(getConf(), metadata);
   }
   
   @Override
@@ -130,4 +154,17 @@
     return new WritableDeserializer(getConf(), c);
   }
 
+  @Override
+  @SuppressWarnings("unchecked")
+  public RawComparator<Writable> getRawComparator(Map<String, String> metadata) {
+    Class<?> klazz = getClassFromMetadata(metadata);
+    if (null == klazz) {
+      throw new IllegalArgumentException(
+          "Cannot get comparator without " + SerializationBase.CLASS_KEY
+          + " set in metadata");
+    }
+
+    return (RawComparator) WritableComparator.get(
+        (Class<WritableComparable>)klazz);
+  }
 }

Modified: hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/avro/AvroGenericSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/avro/AvroGenericSerialization.java?rev=897029&r1=897028&r2=897029&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/avro/AvroGenericSerialization.java (original)
+++ hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/avro/AvroGenericSerialization.java Thu Jan  7 22:04:37 2010
@@ -30,16 +30,18 @@
 
 /**
  * Serialization for Avro Generic classes. For a class to be accepted by this 
- * serialization it must have metadata with key
- * {@link SerializationBase#SERIALIZATION_KEY} set to {@link AvroGenericSerialization}'s
- * fully-qualified classname.
+ * serialization it must have a schema specified.
  * The schema used is the one set by {@link AvroSerialization#AVRO_SCHEMA_KEY}.
  */
 @SuppressWarnings("unchecked")
 public class AvroGenericSerialization extends AvroSerialization<Object> {
-  
+
   @Override
   public boolean accept(Map<String, String> metadata) {
+    if (!checkSerializationKey(metadata)) {
+      return false;
+    }
+
     return metadata.get(AVRO_SCHEMA_KEY) != null;
   }
 
@@ -50,9 +52,8 @@
   }
 
   @Override
-  protected Schema getSchema(Object t, Map<String, String> metadata) {
-    String jsonSchema = metadata.get(AVRO_SCHEMA_KEY);
-    return jsonSchema != null ? Schema.parse(jsonSchema) : GenericData.get().induce(t);
+  protected Schema getSchema(Map<String, String> metadata) {
+    return Schema.parse(metadata.get(AVRO_SCHEMA_KEY));
   }
 
   @Override

Modified: hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java?rev=897029&r1=897028&r2=897029&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java (original)
+++ hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java Thu Jan  7 22:04:37 2010
@@ -54,8 +54,8 @@
     if (packages == null) {
       getPackages();
     }
-    if (getClass().getName().equals(metadata.get(SERIALIZATION_KEY))) {
-      return true;
+    if (!checkSerializationKey(metadata)) {
+      return false;
     }
     Class<?> c = getClassFromMetadata(metadata);
     if (c == null) {
@@ -85,8 +85,9 @@
   }
 
   @Override
-  protected Schema getSchema(Object t, Map<String, String> metadata) {
-    return ReflectData.get().getSchema(t.getClass());
+  protected Schema getSchema(Map<String, String> metadata) {
+    Class<?> c = getClassFromMetadata(metadata);
+    return ReflectData.get().getSchema(c);
   }
 
   @Override

Modified: hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java?rev=897029&r1=897028&r2=897029&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java (original)
+++ hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java Thu Jan  7 22:04:37 2010
@@ -28,6 +28,7 @@
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DatumWriter;
+import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.serializer.DeserializerBase;
 import org.apache.hadoop.io.serializer.SerializationBase;
 import org.apache.hadoop.io.serializer.SerializerBase;
@@ -50,7 +51,7 @@
   /**
    * Return an Avro Schema instance for the given class and metadata.
    */
-  protected abstract Schema getSchema(T t, Map<String, String> metadata);
+  protected abstract Schema getSchema(Map<String, String> metadata);
 
   /**
    * Create and return Avro DatumWriter for the given metadata.
@@ -68,10 +69,13 @@
     private DatumWriter<T> writer;
     private BinaryEncoder encoder;
     private OutputStream outStream;
+    private Schema schema;
 
     AvroSerializer(Map<String, String> metadata) {
       this.metadata = metadata;
-      writer = getWriter(metadata);
+      this.writer = getWriter(metadata);
+      this.schema = getSchema(this.metadata);
+      writer.setSchema(this.schema);
     }
 
     @Override
@@ -88,7 +92,6 @@
 
     @Override
     public void serialize(T t) throws IOException {
-      writer.setSchema(getSchema(t, metadata));
       writer.write(t, encoder);
     }
 
@@ -127,4 +130,18 @@
 
   }
 
+  @Override
+  @SuppressWarnings("unchecked")
+  /**
+   * Provides a raw comparator for Avro-encoded serialized data.
+   * Requires that {@link AvroSerialization#AVRO_SCHEMA_KEY} be provided
+   * in the metadata argument.
+   * @param metadata the Avro-serialization-specific parameters being
+   * provided that detail the schema for the data to deserialize and compare.
+   * @return a RawComparator parameterized for the specified Avro schema.
+   */
+  public RawComparator<T> getRawComparator(Map<String, String> metadata) {
+    Schema schema = getSchema(metadata);
+    return new AvroComparator(schema);
+  }
 }

Modified: hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java?rev=897029&r1=897028&r2=897029&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java (original)
+++ hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java Thu Jan  7 22:04:37 2010
@@ -24,6 +24,7 @@
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.specific.SpecificRecord;
@@ -38,8 +39,8 @@
 
   @Override
   public boolean accept(Map<String, String> metadata) {
-    if (getClass().getName().equals(metadata.get(SERIALIZATION_KEY))) {
-      return true;
+    if (!checkSerializationKey(metadata)) {
+      return false;
     }
     Class<?> c = getClassFromMetadata(metadata);
     return c == null ? false : SpecificRecord.class.isAssignableFrom(c);
@@ -55,8 +56,9 @@
   }
 
   @Override
-  protected Schema getSchema(SpecificRecord t, Map<String, String> metadata) {
-    return t.getSchema();
+  protected Schema getSchema(Map<String, String> metadata) {
+    Class<?> c = getClassFromMetadata(metadata);
+    return SpecificData.get().getSchema(c);
   }
 
   @Override

Modified: hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/ipc/RPC.java?rev=897029&r1=897028&r2=897029&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/ipc/RPC.java Thu Jan  7 22:04:37 2010
@@ -20,9 +20,6 @@
 
 import java.lang.reflect.Proxy;
 import java.lang.reflect.Method;
-import java.lang.reflect.Array;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
 
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
@@ -32,7 +29,6 @@
 import java.util.HashMap;
 
 import javax.net.SocketFactory;
-import javax.security.auth.Subject;
 import javax.security.auth.login.LoginException;
 
 import org.apache.commons.logging.*;
@@ -44,6 +40,7 @@
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /** A simple RPC mechanism.
  *
@@ -64,185 +61,54 @@
  * the protocol instance is transmitted.
  */
 public class RPC {
-  private static final Log LOG =
-    LogFactory.getLog(RPC.class);
+  private static final Log LOG = LogFactory.getLog(RPC.class);
 
   private RPC() {}                                  // no public ctor
 
-
-  /** A method invocation, including the method name and its parameters.*/
-  private static class Invocation implements Writable, Configurable {
-    private String methodName;
-    private Class[] parameterClasses;
-    private Object[] parameters;
-    private Configuration conf;
-
-    public Invocation() {}
-
-    public Invocation(Method method, Object[] parameters) {
-      this.methodName = method.getName();
-      this.parameterClasses = method.getParameterTypes();
-      this.parameters = parameters;
-    }
-
-    /** The name of the method invoked. */
-    public String getMethodName() { return methodName; }
-
-    /** The parameter classes. */
-    public Class[] getParameterClasses() { return parameterClasses; }
-
-    /** The parameter instances. */
-    public Object[] getParameters() { return parameters; }
-
-    public void readFields(DataInput in) throws IOException {
-      methodName = UTF8.readString(in);
-      parameters = new Object[in.readInt()];
-      parameterClasses = new Class[parameters.length];
-      ObjectWritable objectWritable = new ObjectWritable();
-      for (int i = 0; i < parameters.length; i++) {
-        parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf);
-        parameterClasses[i] = objectWritable.getDeclaredClass();
-      }
-    }
-
-    public void write(DataOutput out) throws IOException {
-      UTF8.writeString(out, methodName);
-      out.writeInt(parameterClasses.length);
-      for (int i = 0; i < parameterClasses.length; i++) {
-        ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
-                                   conf);
-      }
-    }
-
-    public String toString() {
-      StringBuffer buffer = new StringBuffer();
-      buffer.append(methodName);
-      buffer.append("(");
-      for (int i = 0; i < parameters.length; i++) {
-        if (i != 0)
-          buffer.append(", ");
-        buffer.append(parameters[i]);
-      }
-      buffer.append(")");
-      return buffer.toString();
-    }
-
-    public void setConf(Configuration conf) {
-      this.conf = conf;
-    }
-
-    public Configuration getConf() {
-      return this.conf;
-    }
-
-  }
-
-  /* Cache a client using its socket factory as the hash key */
-  static private class ClientCache {
-    private Map<SocketFactory, Client> clients =
-      new HashMap<SocketFactory, Client>();
-
-    /**
-     * Construct & cache an IPC client with the user-provided SocketFactory 
-     * if no cached client exists.
-     * 
-     * @param conf Configuration
-     * @return an IPC client
-     */
-    private synchronized Client getClient(Configuration conf,
-        SocketFactory factory) {
-      // Construct & cache client.  The configuration is only used for timeout,
-      // and Clients have connection pools.  So we can either (a) lose some
-      // connection pooling and leak sockets, or (b) use the same timeout for all
-      // configurations.  Since the IPC is usually intended globally, not
-      // per-job, we choose (a).
-      Client client = clients.get(factory);
-      if (client == null) {
-        client = new Client(ObjectWritable.class, conf, factory);
-        clients.put(factory, client);
-      } else {
-        client.incCount();
-      }
-      return client;
-    }
-
-    /**
-     * Construct & cache an IPC client with the default SocketFactory 
-     * if no cached client exists.
-     * 
-     * @param conf Configuration
-     * @return an IPC client
-     */
-    private synchronized Client getClient(Configuration conf) {
-      return getClient(conf, SocketFactory.getDefault());
-    }
-
-    /**
-     * Stop a RPC client connection 
-     * A RPC client is closed only when its reference count becomes zero.
-     */
-    private void stopClient(Client client) {
-      synchronized (this) {
-        client.decCount();
-        if (client.isZeroReference()) {
-          clients.remove(client.getSocketFactory());
-        }
-      }
-      if (client.isZeroReference()) {
-        client.stop();
-      }
-    }
-  }
-
-  private static ClientCache CLIENTS=new ClientCache();
-  
-  private static class Invoker implements InvocationHandler {
-    private Class<? extends VersionedProtocol> protocol;
-    private InetSocketAddress address;
-    private UserGroupInformation ticket;
-    private Client client;
-    private boolean isClosed = false;
-
-    public Invoker(Class<? extends VersionedProtocol> protocol,
-        InetSocketAddress address, UserGroupInformation ticket,
-        Configuration conf, SocketFactory factory) {
-      this.protocol = protocol;
-      this.address = address;
-      this.ticket = ticket;
-      this.client = CLIENTS.getClient(conf, factory);
-    }
-
-    public Object invoke(Object proxy, Method method, Object[] args)
-      throws Throwable {
-      final boolean logDebug = LOG.isDebugEnabled();
-      long startTime = 0;
-      if (logDebug) {
-        startTime = System.currentTimeMillis();
-      }
-
-      ObjectWritable value = (ObjectWritable)
-        client.call(new Invocation(method, args), address, 
-                    protocol, ticket);
-      if (logDebug) {
-        long callTime = System.currentTimeMillis() - startTime;
-        LOG.debug("Call: " + method.getName() + " " + callTime);
-      }
-      return value.get();
-    }
-    
-    /* close the IPC client that's responsible for this invoker's RPCs */ 
-    synchronized private void close() {
-      if (!isClosed) {
-        isClosed = true;
-        CLIENTS.stopClient(client);
-      }
-    }
+  // cache of RpcEngines by protocol
+  private static final Map<Class,RpcEngine> PROTOCOL_ENGINES
+    = new HashMap<Class,RpcEngine>();
+
+  // track what RpcEngine is used by a proxy class, for stopProxy()
+  private static final Map<Class,RpcEngine> PROXY_ENGINES
+    = new HashMap<Class,RpcEngine>();
+
+  private static final String ENGINE_PROP = "rpc.engine";
+
+  // set a protocol to use a non-default RpcEngine
+  static void setProtocolEngine(Configuration conf,
+                                Class protocol, Class engine) {
+    conf.setClass(ENGINE_PROP+"."+protocol.getName(), engine, RpcEngine.class);
+  }
+
+  // return the RpcEngine configured to handle a protocol
+  private static synchronized RpcEngine getProtocolEngine(Class protocol,
+                                                          Configuration conf) {
+    RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
+    if (engine == null) {
+      Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
+                                    WritableRpcEngine.class);
+      engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
+      if (protocol.isInterface())
+        PROXY_ENGINES.put(Proxy.getProxyClass(protocol.getClassLoader(),
+                                              protocol),
+                          engine);
+      PROTOCOL_ENGINES.put(protocol, engine);
+    }
+    return engine;
+  }
+
+  // return the RpcEngine that handles a proxy object
+  private static synchronized RpcEngine getProxyEngine(Object proxy) {
+    return PROXY_ENGINES.get(proxy.getClass());
   }
 
   /**
    * A version mismatch for the RPC protocol.
    */
   public static class VersionMismatch extends IOException {
+    private static final long serialVersionUID = 0;
+
     private String interfaceName;
     private long clientVersion;
     private long serverVersion;
@@ -286,8 +152,8 @@
     }
   }
   
-  public static VersionedProtocol waitForProxy(
-      Class<? extends VersionedProtocol> protocol,
+  public static Object waitForProxy(
+      Class protocol,
       long clientVersion,
       InetSocketAddress addr,
       Configuration conf
@@ -305,13 +171,9 @@
    * @return the proxy
    * @throws IOException if the far end through a RemoteException
    */
-  public static VersionedProtocol waitForProxy(
-                      Class<? extends VersionedProtocol> protocol,
-                                               long clientVersion,
-                                               InetSocketAddress addr,
-                                               Configuration conf,
-                                               long timeout
-                                               ) throws IOException { 
+  public static Object waitForProxy(Class protocol, long clientVersion,
+                             InetSocketAddress addr, Configuration conf,
+                             long timeout) throws IOException { 
     long startTime = System.currentTimeMillis();
     IOException ioe;
     while (true) {
@@ -337,12 +199,12 @@
       }
     }
   }
+
   /** Construct a client-side proxy object that implements the named protocol,
    * talking to a server at the named address. */
-  public static VersionedProtocol getProxy(
-      Class<? extends VersionedProtocol> protocol,
-      long clientVersion, InetSocketAddress addr, Configuration conf,
-      SocketFactory factory) throws IOException {
+  public static Object getProxy(Class protocol, long clientVersion,
+                                InetSocketAddress addr, Configuration conf,
+                                SocketFactory factory) throws IOException {
     UserGroupInformation ugi = null;
     try {
       ugi = UserGroupInformation.login(conf);
@@ -354,23 +216,13 @@
   
   /** Construct a client-side proxy object that implements the named protocol,
    * talking to a server at the named address. */
-  public static VersionedProtocol getProxy(
-      Class<? extends VersionedProtocol> protocol,
-      long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
-      Configuration conf, SocketFactory factory) throws IOException {    
-
-    VersionedProtocol proxy =
-        (VersionedProtocol) Proxy.newProxyInstance(
-            protocol.getClassLoader(), new Class[] { protocol },
-            new Invoker(protocol, addr, ticket, conf, factory));
-    long serverVersion = proxy.getProtocolVersion(protocol.getName(), 
-                                                  clientVersion);
-    if (serverVersion == clientVersion) {
-      return proxy;
-    } else {
-      throw new VersionMismatch(protocol.getName(), clientVersion, 
-                                serverVersion);
-    }
+  public static Object getProxy(Class protocol, long clientVersion,
+                                InetSocketAddress addr,
+                                UserGroupInformation ticket,
+                                Configuration conf,
+                                SocketFactory factory) throws IOException {    
+    return getProtocolEngine(protocol,conf)
+      .getProxy(protocol, clientVersion, addr, ticket, conf, factory);
   }
 
   /**
@@ -383,10 +235,9 @@
    * @return a proxy instance
    * @throws IOException
    */
-  public static VersionedProtocol getProxy(
-      Class<? extends VersionedProtocol> protocol,
-      long clientVersion, InetSocketAddress addr, Configuration conf)
-      throws IOException {
+  public static Object getProxy(Class protocol, long clientVersion,
+                                InetSocketAddress addr, Configuration conf)
+    throws IOException {
 
     return getProxy(protocol, clientVersion, addr, conf, NetUtils
         .getDefaultSocketFactory(conf));
@@ -396,9 +247,9 @@
    * Stop this proxy and release its invoker's resource
    * @param proxy the proxy to be stopped
    */
-  public static void stopProxy(VersionedProtocol proxy) {
+  public static void stopProxy(Object proxy) {
     if (proxy!=null) {
-      ((Invoker)Proxy.getInvocationHandler(proxy)).close();
+      getProxyEngine(proxy).stopProxy(proxy);
     }
   }
 
@@ -406,6 +257,7 @@
    * Expert: Make multiple, parallel calls to a set of servers.
    * @deprecated Use {@link #call(Method, Object[][], InetSocketAddress[], UserGroupInformation, Configuration)} instead 
    */
+  @Deprecated
   public static Object[] call(Method method, Object[][] params,
                               InetSocketAddress[] addrs, Configuration conf)
     throws IOException {
@@ -418,169 +270,61 @@
                               UserGroupInformation ticket, Configuration conf)
     throws IOException {
 
-    Invocation[] invocations = new Invocation[params.length];
-    for (int i = 0; i < params.length; i++)
-      invocations[i] = new Invocation(method, params[i]);
-    Client client = CLIENTS.getClient(conf);
-    try {
-    Writable[] wrappedValues = 
-      client.call(invocations, addrs, method.getDeclaringClass(), ticket);
-    
-    if (method.getReturnType() == Void.TYPE) {
-      return null;
-    }
-
-    Object[] values =
-      (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
-    for (int i = 0; i < values.length; i++)
-      if (wrappedValues[i] != null)
-        values[i] = ((ObjectWritable)wrappedValues[i]).get();
-    
-    return values;
-    } finally {
-      CLIENTS.stopClient(client);
-    }
+    return getProtocolEngine(method.getDeclaringClass(), conf)
+      .call(method, params, addrs, ticket, conf);
   }
 
   /** Construct a server for a protocol implementation instance listening on a
-   * port and address. */
+   * port and address.
+   * @deprecated protocol interface should be passed.
+   */
+  @Deprecated
   public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf) 
     throws IOException {
     return getServer(instance, bindAddress, port, 1, false, conf);
   }
 
   /** Construct a server for a protocol implementation instance listening on a
-   * port and address. */
+   * port and address.
+   * @deprecated protocol interface should be passed.
+   */
+  @Deprecated
   public static Server getServer(final Object instance, final String bindAddress, final int port,
                                  final int numHandlers,
                                  final boolean verbose, Configuration conf) 
     throws IOException {
-    return new Server(instance, conf, bindAddress, port, numHandlers, verbose);
+    return getServer(instance.getClass(),         // use impl class for protocol
+                     instance, bindAddress, port, numHandlers, false, conf);
   }
 
-  /** An RPC Server. */
-  public static class Server extends org.apache.hadoop.ipc.Server {
-    private Object instance;
-    private boolean verbose;
-    private boolean authorize = false;
-
-    /** Construct an RPC server.
-     * @param instance the instance whose methods will be called
-     * @param conf the configuration to use
-     * @param bindAddress the address to bind on to listen for connection
-     * @param port the port to listen for connections on
-     */
-    public Server(Object instance, Configuration conf, String bindAddress, int port) 
-      throws IOException {
-      this(instance, conf,  bindAddress, port, 1, false);
-    }
-    
-    private static String classNameBase(String className) {
-      String[] names = className.split("\\.", -1);
-      if (names == null || names.length == 0) {
-        return className;
-      }
-      return names[names.length-1];
-    }
-    
-    /** Construct an RPC server.
-     * @param instance the instance whose methods will be called
-     * @param conf the configuration to use
-     * @param bindAddress the address to bind on to listen for connection
-     * @param port the port to listen for connections on
-     * @param numHandlers the number of method handler threads to run
-     * @param verbose whether each call should be logged
-     */
-    public Server(Object instance, Configuration conf, String bindAddress,  int port,
-                  int numHandlers, boolean verbose) throws IOException {
-      super(bindAddress, port, Invocation.class, numHandlers, conf, classNameBase(instance.getClass().getName()));
-      this.instance = instance;
-      this.verbose = verbose;
-      this.authorize = 
-        conf.getBoolean(ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, 
-                        false);
-    }
-
-    public Writable call(Class<?> protocol, Writable param, long receivedTime) 
+  /** Construct a server for a protocol implementation instance. */
+  public static Server getServer(Class protocol,
+                                 Object instance, String bindAddress,
+                                 int port, Configuration conf) 
     throws IOException {
-      try {
-        Invocation call = (Invocation)param;
-        if (verbose) log("Call: " + call);
+    return getServer(protocol, instance, bindAddress, port, 1, false, conf);
+  }
 
-        Method method =
-          protocol.getMethod(call.getMethodName(),
-                                   call.getParameterClasses());
-        method.setAccessible(true);
-
-        long startTime = System.currentTimeMillis();
-        Object value = method.invoke(instance, call.getParameters());
-        int processingTime = (int) (System.currentTimeMillis() - startTime);
-        int qTime = (int) (startTime-receivedTime);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Served: " + call.getMethodName() +
-                    " queueTime= " + qTime +
-                    " procesingTime= " + processingTime);
-        }
-        rpcMetrics.rpcQueueTime.inc(qTime);
-        rpcMetrics.rpcProcessingTime.inc(processingTime);
-
-        MetricsTimeVaryingRate m =
-         (MetricsTimeVaryingRate) rpcMetrics.registry.get(call.getMethodName());
-      	if (m == null) {
-      	  try {
-      	    m = new MetricsTimeVaryingRate(call.getMethodName(),
-      	                                        rpcMetrics.registry);
-      	  } catch (IllegalArgumentException iae) {
-      	    // the metrics has been registered; re-fetch the handle
-      	    LOG.info("Error register " + call.getMethodName(), iae);
-      	    m = (MetricsTimeVaryingRate) rpcMetrics.registry.get(
-      	        call.getMethodName());
-      	  }
-      	}
-        m.inc(processingTime);
-
-        if (verbose) log("Return: "+value);
-
-        return new ObjectWritable(method.getReturnType(), value);
-
-      } catch (InvocationTargetException e) {
-        Throwable target = e.getTargetException();
-        if (target instanceof IOException) {
-          throw (IOException)target;
-        } else {
-          IOException ioe = new IOException(target.toString());
-          ioe.setStackTrace(target.getStackTrace());
-          throw ioe;
-        }
-      } catch (Throwable e) {
-        if (!(e instanceof IOException)) {
-          LOG.error("Unexpected throwable object ", e);
-        }
-        IOException ioe = new IOException(e.toString());
-        ioe.setStackTrace(e.getStackTrace());
-        throw ioe;
-      }
-    }
+  /** Construct a server for a protocol implementation instance. */
+  public static Server getServer(Class protocol,
+                                 Object instance, String bindAddress, int port,
+                                 int numHandlers,
+                                 boolean verbose, Configuration conf) 
+    throws IOException {
+    
+    return getProtocolEngine(protocol, conf)
+      .getServer(protocol, instance, bindAddress, port, numHandlers, verbose,
+                 conf);
+  }
 
-    @Override
-    public void authorize(Subject user, ConnectionHeader connection) 
-    throws AuthorizationException {
-      if (authorize) {
-        Class<?> protocol = null;
-        try {
-          protocol = getProtocolClass(connection.getProtocol(), getConf());
-        } catch (ClassNotFoundException cfne) {
-          throw new AuthorizationException("Unknown protocol: " + 
-                                           connection.getProtocol());
-        }
-        ServiceAuthorizationManager.authorize(user, protocol);
-      }
+  /** An RPC Server. */
+  public abstract static class Server extends org.apache.hadoop.ipc.Server {
+  
+    protected Server(String bindAddress, int port, 
+                     Class<? extends Writable> paramClass, int handlerCount, 
+                     Configuration conf, String serverName) throws IOException {
+      super(bindAddress, port, paramClass, handlerCount, conf, serverName);
     }
   }
 
-  private static void log(String value) {
-    if (value!= null && value.length() > 55)
-      value = value.substring(0, 55)+"...";
-    LOG.info(value);
-  }
 }

Modified: hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/ipc/Server.java?rev=897029&r1=897028&r2=897029&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/ipc/Server.java Thu Jan  7 22:04:37 2010
@@ -67,6 +67,7 @@
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.ipc.metrics.RpcMetrics;
 import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.UserGroupInformation;
 
 /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
@@ -90,6 +91,12 @@
    */
   private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100;
   
+  /**
+   * Initial and max size of response buffer
+   */
+  static int INITIAL_RESP_BUF_SIZE = 10240;
+  static int MAX_RESP_BUF_SIZE = 1024*1024;
+    
   public static final Log LOG = LogFactory.getLog(Server.class);
 
   private static final ThreadLocal<Server> SERVER = new ThreadLocal<Server>();
@@ -893,8 +900,13 @@
       }
       
       // TODO: Get the user name from the GSS API for Kerberbos-based security
-      // Create the user subject
-      user = SecurityUtil.getSubject(header.getUgi());
+      // Create the user subject; however use the groups as defined on the
+      // server-side, don't trust the user groups provided by the client
+      UserGroupInformation ugi = header.getUgi();
+      user = null;
+      if(ugi != null) {
+        user = SecurityUtil.getSubject(conf, header.getUgi().getUserName());
+      }
     }
     
     private void processData() throws  IOException, InterruptedException {
@@ -905,7 +917,7 @@
       if (LOG.isDebugEnabled())
         LOG.debug(" got #" + id);
 
-      Writable param = ReflectionUtils.newInstance(paramClass, conf);           // read param
+      Writable param = ReflectionUtils.newInstance(paramClass, conf);//read param
       param.readFields(dis);        
         
       Call call = new Call(id, param, this);
@@ -938,7 +950,8 @@
     public void run() {
       LOG.info(getName() + ": starting");
       SERVER.set(Server.this);
-      ByteArrayOutputStream buf = new ByteArrayOutputStream(10240);
+      ByteArrayOutputStream buf = 
+        new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
       while (running) {
         try {
           final Call call = callQueue.take(); // pop the queue; maybe blocked here
@@ -979,10 +992,16 @@
             error = StringUtils.stringifyException(e);
           }
           CurCall.set(null);
-
           setupResponse(buf, call, 
                         (error == null) ? Status.SUCCESS : Status.ERROR, 
                         value, errorClass, error);
+          // Discard the large buf and reset it back to 
+          // smaller size to freeup heap
+          if (buf.size() > MAX_RESP_BUF_SIZE) {
+            LOG.warn("Large response size " + buf.size() + " for call " + 
+                call.toString());
+            buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
+          }
           responder.doRespond(call);
         } catch (InterruptedException e) {
           if (running) {                          // unexpected -- log it

Modified: hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/security/SecurityUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/security/SecurityUtil.java?rev=897029&r1=897028&r2=897029&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/security/SecurityUtil.java (original)
+++ hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/security/SecurityUtil.java Thu Jan  7 22:04:37 2010
@@ -17,9 +17,11 @@
  */
 package org.apache.hadoop.security;
 
+import java.io.IOException;
 import java.security.Policy;
 import java.security.Principal;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 
@@ -41,6 +43,8 @@
                                    PolicyProvider.DEFAULT_POLICY_PROVIDER));
   }
   
+  private static Groups GROUPS = null;
+  
   /**
    * Set the global security policy for Hadoop.
    * 
@@ -62,6 +66,18 @@
   }
   
   /**
+   * Get the {@link Groups} being used to map user-to-groups.
+   * @return the <code>Groups</code> being used to map user-to-groups.
+   */
+  public static Groups getUserToGroupsMappingService(Configuration conf) {
+    if(GROUPS == null) {
+      LOG.info(" Creating new Groups object");
+      GROUPS = new Groups(conf);
+    }
+    return GROUPS;
+  }
+  
+  /**
    * Get the {@link Subject} for the user identified by <code>ugi</code>.
    * @param ugi user
    * @return the {@link Subject} for the user identified by <code>ugi</code>
@@ -70,9 +86,9 @@
     if (ugi == null) {
       return null;
     }
-    
-    Set<Principal> principals =       // Number of principals = username + #groups 
-      new HashSet<Principal>(ugi.getGroupNames().length+1);
+    // Number of principals = username + #groups + ugi
+    Set<Principal> principals =   
+      new HashSet<Principal>(ugi.getGroupNames().length+1+1);
     User userPrincipal = new User(ugi.getUserName()); 
     principals.add(userPrincipal);
     for (String group : ugi.getGroupNames()) {
@@ -87,6 +103,44 @@
   }
   
   /**
+   * Get the {@link Subject} for the user identified by <code>userName</code>.
+   * @param userName user name
+   * @return the {@link Subject} for the user identified by <code>userName</code>
+   * @throws IOException
+   */
+  public static Subject getSubject(Configuration conf, String userName) 
+    throws IOException {
+    if (userName == null) {
+      return null;
+    }
+    
+    Set<Principal> principals = new HashSet<Principal>();
+    User userPrincipal = new User(userName); 
+    principals.add(userPrincipal);
+    
+    // Get user's groups
+    List<String> groups = getUserToGroupsMappingService(conf).getGroups(userName);
+    StringBuffer sb = new StringBuffer("Groups for '" + userName + "': <");
+    for (String group : groups) {
+      Group groupPrincipal = new Group(group);
+      principals.add(groupPrincipal);
+      sb.append(group + " ");
+    }
+    sb.append(">");
+    LOG.info(sb);
+    
+    // Create the ugi with the right groups
+    UserGroupInformation ugi = 
+      new UnixUserGroupInformation(userName, 
+                                   groups.toArray(new String[groups.size()]));
+    principals.add(ugi);
+    Subject user = 
+      new Subject(false, principals, new HashSet<Object>(), new HashSet<Object>());
+    
+    return user;
+  }
+  
+  /**
    * Class representing a configured access control list.
    */
   public static class AccessControlList {

Modified: hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/security/UnixUserGroupInformation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/security/UnixUserGroupInformation.java?rev=897029&r1=897028&r2=897029&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/security/UnixUserGroupInformation.java (original)
+++ hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/security/UnixUserGroupInformation.java Thu Jan  7 22:04:37 2010
@@ -95,10 +95,13 @@
    * @exception IllegalArgumentException if any argument is null
    */
   private void setUserGroupNames(String userName, String[] groupNames) {
-    if (userName==null || userName.length()==0 ||
-        groupNames== null || groupNames.length==0) {
+    if (userName==null || userName.length()==0) {
       throw new IllegalArgumentException(
-          "Parameters should not be null or an empty string/array");
+          "username should not be null nor empty");
+    }
+    if(groupNames == null) {
+      throw new IllegalArgumentException(
+      "group names array should not be null");
     }
     for (int i=0; i<groupNames.length; i++) {
       if(groupNames[i] == null || groupNames[i].length() == 0) {

Modified: hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/util/GenericOptionsParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/util/GenericOptionsParser.java?rev=897029&r1=897028&r2=897029&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/util/GenericOptionsParser.java (original)
+++ hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/util/GenericOptionsParser.java Thu Jan  7 22:04:37 2010
@@ -233,6 +233,12 @@
     .withDescription("comma separated archives to be unarchived" +
                      " on the compute machines.")
     .create("archives");
+    
+    // file with security tokens
+    Option tokensFile = OptionBuilder.withArgName("tokensFile")
+    .hasArg()
+    .withDescription("name of the file with the tokens")
+    .create("tokenCacheFile");
 
     opts.addOption(fs);
     opts.addOption(jt);
@@ -241,6 +247,7 @@
     opts.addOption(libjars);
     opts.addOption(files);
     opts.addOption(archives);
+    opts.addOption(tokensFile);
 
     return opts;
   }
@@ -295,6 +302,19 @@
       }
     }
     conf.setBoolean("mapred.used.genericoptionsparser", true);
+    
+    // tokensFile
+    if(line.hasOption("tokenCacheFile")) {
+      String fileName = line.getOptionValue("tokenCacheFile");
+      // check if the local file exists
+      FileSystem localFs = FileSystem.getLocal(conf);
+      Path p = new Path(fileName);
+      if (!localFs.exists(p)) {
+          throw new FileNotFoundException("File "+fileName+" does not exist.");
+      }
+      LOG.debug("setting conf tokensFile: " + fileName);
+      conf.set("tokenCacheFile", localFs.makeQualified(p).toString());
+    }
   }
   
   /**

Modified: hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/util/RunJar.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/util/RunJar.java?rev=897029&r1=897028&r2=897029&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/util/RunJar.java (original)
+++ hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/util/RunJar.java Thu Jan  7 22:04:37 2010
@@ -18,43 +18,66 @@
 
 package org.apache.hadoop.util;
 
-import java.util.jar.*;
-import java.lang.reflect.*;
+import java.lang.reflect.Array;
+import java.lang.reflect.Method;
+import java.lang.reflect.InvocationTargetException;
 import java.net.URL;
 import java.net.URLClassLoader;
-import java.io.*;
-import java.util.*;
-
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.File;
+import java.util.regex.Pattern;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.jar.JarFile;
+import java.util.jar.JarEntry;
+import java.util.jar.Manifest;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.io.IOUtils;
 
 /** Run a Hadoop job jar. */
 public class RunJar {
 
-  /** Unpack a jar file into a directory. */
+  /** Pattern that matches any string */
+  public static final Pattern MATCH_ANY = Pattern.compile(".*");
+
+  /**
+   * Unpack a jar file into a directory.
+   *
+   * This version unpacks all files inside the jar regardless of filename.
+   */
   public static void unJar(File jarFile, File toDir) throws IOException {
+    unJar(jarFile, toDir, MATCH_ANY);
+  }
+
+  /**
+   * Unpack matching files from a jar. Entries inside the jar that do
+   * not match the given pattern will be skipped.
+   *
+   * @param jarFile the .jar file to unpack
+   * @param toDir the destination directory into which to unpack the jar
+   * @param unpackRegex the pattern to match jar entries against
+   */
+  public static void unJar(File jarFile, File toDir, Pattern unpackRegex)
+    throws IOException {
     JarFile jar = new JarFile(jarFile);
     try {
-      Enumeration entries = jar.entries();
+      Enumeration<JarEntry> entries = jar.entries();
       while (entries.hasMoreElements()) {
         JarEntry entry = (JarEntry)entries.nextElement();
-        if (!entry.isDirectory()) {
+        if (!entry.isDirectory() &&
+            unpackRegex.matcher(entry.getName()).matches()) {
           InputStream in = jar.getInputStream(entry);
           try {
             File file = new File(toDir, entry.getName());
-            if (!file.getParentFile().mkdirs()) {
-              if (!file.getParentFile().isDirectory()) {
-                throw new IOException("Mkdirs failed to create " + 
-                                      file.getParentFile().toString());
-              }
-            }
+            ensureDirectory(file.getParentFile());
             OutputStream out = new FileOutputStream(file);
             try {
-              byte[] buffer = new byte[8192];
-              int i;
-              while ((i = in.read(buffer)) != -1) {
-                out.write(buffer, 0, i);
-              }
+              IOUtils.copyBytes(in, out, 8192);
             } finally {
               out.close();
             }
@@ -68,6 +91,18 @@
     }
   }
 
+  /**
+   * Ensure the existence of a given directory.
+   *
+   * @throws IOException if it cannot be created and does not already exist
+   */
+  private static void ensureDirectory(File dir) throws IOException {
+    if (!dir.mkdirs() && !dir.isDirectory()) {
+      throw new IOException("Mkdirs failed to create " +
+                            dir.toString());
+    }
+  }
+
   /** Run a Hadoop job jar.  If the main class is not in the jar's manifest,
    * then it must be provided on the command line. */
   public static void main(String[] args) throws Throwable {
@@ -107,22 +142,14 @@
     mainClassName = mainClassName.replaceAll("/", ".");
 
     File tmpDir = new File(new Configuration().get("hadoop.tmp.dir"));
-    boolean b = tmpDir.mkdirs();
-    if (!b && !tmpDir.isDirectory()) { 
-      System.err.println("Mkdirs failed to create " + tmpDir);
-      System.exit(-1);
-    }
+    ensureDirectory(tmpDir);
+
     final File workDir = File.createTempFile("hadoop-unjar", "", tmpDir);
-    b = workDir.delete();
-    if (!b) {
+    if (!workDir.delete()) {
       System.err.println("Delete failed for " + workDir);
       System.exit(-1);
     }
-    b = workDir.mkdirs();
-    if (!b && !workDir.isDirectory()) {
-      System.err.println("Mkdirs failed to create " + workDir);
-      System.exit(-1);
-    }
+    ensureDirectory(workDir);
 
     Runtime.getRuntime().addShutdownHook(new Thread() {
         public void run() {
@@ -134,15 +161,15 @@
       });
 
     unJar(file, workDir);
-    
+
     ArrayList<URL> classPath = new ArrayList<URL>();
-    classPath.add(new File(workDir+"/").toURL());
-    classPath.add(file.toURL());
-    classPath.add(new File(workDir, "classes/").toURL());
+    classPath.add(new File(workDir+"/").toURI().toURL());
+    classPath.add(file.toURI().toURL());
+    classPath.add(new File(workDir, "classes/").toURI().toURL());
     File[] libs = new File(workDir, "lib").listFiles();
     if (libs != null) {
       for (int i = 0; i < libs.length; i++) {
-        classPath.add(libs[i].toURL());
+        classPath.add(libs[i].toURI().toURL());
       }
     }
     

Modified: hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/util/Service.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/util/Service.java?rev=897029&r1=897028&r2=897029&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/util/Service.java (original)
+++ hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/util/Service.java Thu Jan  7 22:04:37 2010
@@ -111,7 +111,10 @@
    * A root cause for failure. May be null.
    */
   private Throwable failureCause;
-  
+
+  /**
+   * State change listeners
+   */
   private List<StateChangeListener> stateListeners;
 
   /**
@@ -780,6 +783,14 @@
     CREATED,
 
     /**
+     * The service is in its start() method, and is not yet out of its early initialization.
+     * A key point here is that when you can try to terminate a service that is starting, which is
+     * done by interrupting the object. Ideally services should not block in this state, but as they
+     * do, we need a way to exit them.
+     */
+    STARTING,
+
+    /**
      * The service is starting up.
      * Its {@link Service#start()} method has been called.
      * When it is ready for work, it will declare itself LIVE.
@@ -793,7 +804,14 @@
      * The service has failed
      */
     FAILED,
-      /**
+
+    /**
+     * Its {@link Service#close()} ()} method has been called and the service is now in the (hopefully short)
+     * shutdown process. 
+     */
+    CLOSING,
+
+    /**
      * the service has been shut down
      * The container process may now destroy the instance
      * Its {@link Service#close()} ()} method has been called.

Modified: hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/util/Shell.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/util/Shell.java?rev=897029&r1=897028&r2=897029&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/util/Shell.java (original)
+++ hadoop/common/branches/HADOOP-6194/src/java/org/apache/hadoop/util/Shell.java Thu Jan  7 22:04:37 2010
@@ -47,6 +47,11 @@
   public static String[] getGROUPS_COMMAND() {
     return new String[]{"bash", "-c", "groups"};
   }
+  /** a Unix command to get a given user's groups list */
+  public static String[] getGROUPS_FOR_USER_COMMAND(final String user) {
+    //'groups username' command return is non-consistent across different unixes
+    return new String [] {"bash", "-c", "id -Gn " + user};
+  }
   /** a Unix command to set permission */
   public static final String SET_PERMISSION_COMMAND = "chmod";
   /** a Unix command to set owner */

Propchange: hadoop/common/branches/HADOOP-6194/src/test/core/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan  7 22:04:37 2010
@@ -1,3 +1,3 @@
-/hadoop/common/trunk/src/test/core:804966-888120
+/hadoop/common/trunk/src/test/core:804966-897004
 /hadoop/core/branches/branch-0.19/core/src/test/core:713112
 /hadoop/core/trunk/src/test/core:776175-785643,785929-786278



Mime
View raw message