hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1399950 [6/11] - in /hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project: ./ bin/ conf/ dev-support/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduc...
Date Fri, 19 Oct 2012 02:28:42 GMT
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java Fri Oct 19 02:25:55 2012
@@ -19,18 +19,19 @@
 package org.apache.hadoop.fs;
 
 import java.io.BufferedReader;
+import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.PrintStream;
 import java.util.Date;
+import java.util.Random;
 import java.util.StringTokenizer;
 
-import junit.framework.TestCase;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -38,18 +39,30 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
 
 /**
  * Distributed i/o benchmark.
  * <p>
  * This test writes into or reads from a specified number of files.
- * File size is specified as a parameter to the test. 
+ * Number of bytes to write or read is specified as a parameter to the test. 
  * Each file is accessed in a separate map task.
  * <p>
  * The reducer collects the following statistics:
@@ -72,24 +85,24 @@ import org.apache.hadoop.util.ToolRunner
  * <li>standard deviation of i/o rate </li>
  * </ul>
  */
-public class TestDFSIO extends TestCase implements Tool {
+public class TestDFSIO implements Tool {
   // Constants
   private static final Log LOG = LogFactory.getLog(TestDFSIO.class);
-  private static final int TEST_TYPE_READ = 0;
-  private static final int TEST_TYPE_WRITE = 1;
-  private static final int TEST_TYPE_CLEANUP = 2;
-  private static final int TEST_TYPE_APPEND = 3;
   private static final int DEFAULT_BUFFER_SIZE = 1000000;
   private static final String BASE_FILE_NAME = "test_io_";
   private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log";
   private static final long MEGA = ByteMultiple.MB.value();
+  private static final int DEFAULT_NR_BYTES = 1;
+  private static final int DEFAULT_NR_FILES = 4;
   private static final String USAGE =
-                            "Usage: " + TestDFSIO.class.getSimpleName() +
-                            " [genericOptions]" +
-                            " -read | -write | -append | -clean [-nrFiles N]" +
-                            " [-fileSize Size[B|KB|MB|GB|TB]]" +
-                            " [-resFile resultFileName] [-bufferSize Bytes]" +
-                            " [-rootDir]";
+                    "Usage: " + TestDFSIO.class.getSimpleName() +
+                    " [genericOptions]" +
+                    " -read [-random | -backward | -skip [-skipSize Size]] |" +
+                    " -write | -append | -clean" +
+                    " [-nrFiles N]" +
+                    " [-size Size[B|KB|MB|GB|TB]]" +
+                    " [-resFile resultFileName] [-bufferSize Bytes]" +
+                    " [-rootDir]";
 
   private Configuration config;
 
@@ -100,6 +113,27 @@ public class TestDFSIO extends TestCase 
     Configuration.addDefaultResource("mapred-site.xml");
   }
 
+  private static enum TestType {
+    TEST_TYPE_READ("read"),
+    TEST_TYPE_WRITE("write"),
+    TEST_TYPE_CLEANUP("cleanup"),
+    TEST_TYPE_APPEND("append"),
+    TEST_TYPE_READ_RANDOM("random read"),
+    TEST_TYPE_READ_BACKWARD("backward read"),
+    TEST_TYPE_READ_SKIP("skip read");
+
+    private String type;
+
+    private TestType(String t) {
+      type = t;
+    }
+
+    @Override // String
+    public String toString() {
+      return type;
+    }
+  }
+
   static enum ByteMultiple {
     B(1L),
     KB(0x400L),
@@ -154,62 +188,100 @@ public class TestDFSIO extends TestCase 
   private static Path getAppendDir(Configuration conf) {
     return new Path(getBaseDir(conf), "io_append");
   }
+  private static Path getRandomReadDir(Configuration conf) {
+    return new Path(getBaseDir(conf), "io_random_read");
+  }
   private static Path getDataDir(Configuration conf) {
     return new Path(getBaseDir(conf), "io_data");
   }
 
-  /**
-   * Run the test with default parameters.
-   * 
-   * @throws Exception
-   */
-  public void testIOs() throws Exception {
-    TestDFSIO bench = new TestDFSIO();
-    bench.testIOs(1, 4);
+  private static MiniDFSCluster cluster;
+  private static TestDFSIO bench;
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    bench = new TestDFSIO();
+    bench.getConf().setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
+    cluster = new MiniDFSCluster.Builder(bench.getConf())
+                                .numDataNodes(2)
+                                .format(true)
+                                .build();
+    FileSystem fs = cluster.getFileSystem();
+    bench.createControlFile(fs, DEFAULT_NR_BYTES, DEFAULT_NR_FILES);
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    if(cluster == null)
+      return;
+    FileSystem fs = cluster.getFileSystem();
+    bench.cleanup(fs);
+    cluster.shutdown();
   }
 
-  /**
-   * Run the test with the specified parameters.
-   * 
-   * @param fileSize file size
-   * @param nrFiles number of files
-   * @throws IOException
-   */
-  public void testIOs(int fileSize, int nrFiles)
-    throws IOException {
-    config.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
-    MiniDFSCluster cluster = null;
-    try {
-      cluster = new MiniDFSCluster(config, 2, true, null);
-      FileSystem fs = cluster.getFileSystem();
+  @Test
+  public void testWrite() throws Exception {
+    FileSystem fs = cluster.getFileSystem();
+    long tStart = System.currentTimeMillis();
+    bench.writeTest(fs);
+    long execTime = System.currentTimeMillis() - tStart;
+    bench.analyzeResult(fs, TestType.TEST_TYPE_WRITE, execTime);
+  }
 
-      createControlFile(fs, fileSize, nrFiles);
-      long tStart = System.currentTimeMillis();
-      writeTest(fs);
-      long execTime = System.currentTimeMillis() - tStart;
-      analyzeResult(fs, TEST_TYPE_WRITE, execTime, DEFAULT_RES_FILE_NAME);
+  @Test
+  public void testRead() throws Exception {
+    FileSystem fs = cluster.getFileSystem();
+    long tStart = System.currentTimeMillis();
+    bench.readTest(fs);
+    long execTime = System.currentTimeMillis() - tStart;
+    bench.analyzeResult(fs, TestType.TEST_TYPE_READ, execTime);
+  }
 
-      tStart = System.currentTimeMillis();
-      readTest(fs);
-      execTime = System.currentTimeMillis() - tStart;
-      analyzeResult(fs, TEST_TYPE_READ, execTime, DEFAULT_RES_FILE_NAME);
+  @Test
+  public void testReadRandom() throws Exception {
+    FileSystem fs = cluster.getFileSystem();
+    long tStart = System.currentTimeMillis();
+    bench.getConf().setLong("test.io.skip.size", 0);
+    bench.randomReadTest(fs);
+    long execTime = System.currentTimeMillis() - tStart;
+    bench.analyzeResult(fs, TestType.TEST_TYPE_READ_RANDOM, execTime);
+  }
 
-      tStart = System.currentTimeMillis();
-      appendTest(fs);
-      execTime = System.currentTimeMillis() - tStart;
-      analyzeResult(fs, TEST_TYPE_APPEND, execTime, DEFAULT_RES_FILE_NAME);
+  @Test
+  public void testReadBackward() throws Exception {
+    FileSystem fs = cluster.getFileSystem();
+    long tStart = System.currentTimeMillis();
+    bench.getConf().setLong("test.io.skip.size", -DEFAULT_BUFFER_SIZE);
+    bench.randomReadTest(fs);
+    long execTime = System.currentTimeMillis() - tStart;
+    bench.analyzeResult(fs, TestType.TEST_TYPE_READ_BACKWARD, execTime);
+  }
 
-      cleanup(fs);
-    } finally {
-      if(cluster != null) cluster.shutdown();
-    }
+  @Test
+  public void testReadSkip() throws Exception {
+    FileSystem fs = cluster.getFileSystem();
+    long tStart = System.currentTimeMillis();
+    bench.getConf().setLong("test.io.skip.size", 1);
+    bench.randomReadTest(fs);
+    long execTime = System.currentTimeMillis() - tStart;
+    bench.analyzeResult(fs, TestType.TEST_TYPE_READ_SKIP, execTime);
   }
 
+  @Test
+  public void testAppend() throws Exception {
+    FileSystem fs = cluster.getFileSystem();
+    long tStart = System.currentTimeMillis();
+    bench.appendTest(fs);
+    long execTime = System.currentTimeMillis() - tStart;
+    bench.analyzeResult(fs, TestType.TEST_TYPE_APPEND, execTime);
+  }
+
+  @SuppressWarnings("deprecation")
   private void createControlFile(FileSystem fs,
-                                  long fileSize, // in bytes
+                                  long nrBytes, // in bytes
                                   int nrFiles
                                 ) throws IOException {
-    LOG.info("creating control file: "+fileSize+" bytes, "+nrFiles+" files");
+    LOG.info("creating control file: "+nrBytes+" bytes, "+nrFiles+" files");
 
     Path controlDir = getControlDir(config);
     fs.delete(controlDir, true);
@@ -222,7 +294,7 @@ public class TestDFSIO extends TestCase 
         writer = SequenceFile.createWriter(fs, config, controlFile,
                                            Text.class, LongWritable.class,
                                            CompressionType.NONE);
-        writer.append(new Text(name), new LongWritable(fileSize));
+        writer.append(new Text(name), new LongWritable(nrBytes));
       } catch(Exception e) {
         throw new IOException(e.getLocalizedMessage());
       } finally {
@@ -250,10 +322,35 @@ public class TestDFSIO extends TestCase 
    * <li>i/o rate squared</li>
    * </ul>
    */
-  private abstract static class IOStatMapper<T> extends IOMapperBase<T> {
-    IOStatMapper() { 
+  private abstract static class IOStatMapper extends IOMapperBase<Long> {
+    protected CompressionCodec compressionCodec;
+
+    IOStatMapper() {
     }
-    
+
+    @Override // Mapper
+    public void configure(JobConf conf) {
+      super.configure(conf);
+
+      // grab compression
+      String compression = getConf().get("test.io.compression.class", null);
+      Class<? extends CompressionCodec> codec;
+
+      // try to initialize codec
+      try {
+        codec = (compression == null) ? null : 
+          Class.forName(compression).asSubclass(CompressionCodec.class);
+      } catch(Exception e) {
+        throw new RuntimeException("Compression codec not found: ", e);
+      }
+
+      if(codec != null) {
+        compressionCodec = (CompressionCodec)
+            ReflectionUtils.newInstance(codec, getConf());
+      }
+    }
+
+    @Override // IOMapperBase
     void collectStats(OutputCollector<Text, Text> output, 
                       String name,
                       long execTime, 
@@ -280,34 +377,38 @@ public class TestDFSIO extends TestCase 
   /**
    * Write mapper class.
    */
-  public static class WriteMapper extends IOStatMapper<Long> {
+  public static class WriteMapper extends IOStatMapper {
 
     public WriteMapper() { 
       for(int i=0; i < bufferSize; i++)
         buffer[i] = (byte)('0' + i % 50);
     }
 
-    @Override
+    @Override // IOMapperBase
+    public Closeable getIOStream(String name) throws IOException {
+      // create file
+      OutputStream out =
+          fs.create(new Path(getDataDir(getConf()), name), true, bufferSize);
+      if(compressionCodec != null)
+        out = compressionCodec.createOutputStream(out);
+      LOG.info("out = " + out.getClass().getName());
+      return out;
+    }
+
+    @Override // IOMapperBase
     public Long doIO(Reporter reporter, 
                        String name, 
                        long totalSize // in bytes
                      ) throws IOException {
-      // create file
-      OutputStream out;
-      out = fs.create(new Path(getDataDir(getConf()), name), true, bufferSize);
-      
-      try {
-        // write to the file
-        long nrRemaining;
-        for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
-          int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining; 
-          out.write(buffer, 0, curSize);
-          reporter.setStatus("writing " + name + "@" + 
-                             (totalSize - nrRemaining) + "/" + totalSize 
-                             + " ::host = " + hostName);
-        }
-      } finally {
-        out.close();
+      OutputStream out = (OutputStream)this.stream;
+      // write to the file
+      long nrRemaining;
+      for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
+        int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining;
+        out.write(buffer, 0, curSize);
+        reporter.setStatus("writing " + name + "@" + 
+                           (totalSize - nrRemaining) + "/" + totalSize 
+                           + " ::host = " + hostName);
       }
       return Long.valueOf(totalSize);
     }
@@ -321,7 +422,6 @@ public class TestDFSIO extends TestCase 
     runIOTest(WriteMapper.class, writeDir);
   }
   
-  @SuppressWarnings("deprecation")
   private void runIOTest(
           Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass, 
           Path outputDir) throws IOException {
@@ -343,33 +443,38 @@ public class TestDFSIO extends TestCase 
   /**
    * Append mapper class.
    */
-  public static class AppendMapper extends IOStatMapper<Long> {
+  public static class AppendMapper extends IOStatMapper {
 
     public AppendMapper() { 
       for(int i=0; i < bufferSize; i++)
         buffer[i] = (byte)('0' + i % 50);
     }
 
+    @Override // IOMapperBase
+    public Closeable getIOStream(String name) throws IOException {
+      // open file for append
+      OutputStream out =
+          fs.append(new Path(getDataDir(getConf()), name), bufferSize);
+      if(compressionCodec != null)
+        out = compressionCodec.createOutputStream(out);
+      LOG.info("out = " + out.getClass().getName());
+      return out;
+    }
+
+    @Override // IOMapperBase
     public Long doIO(Reporter reporter, 
                        String name, 
                        long totalSize // in bytes
                      ) throws IOException {
-      // create file
-      OutputStream out;
-      out = fs.append(new Path(getDataDir(getConf()), name), bufferSize);
-      
-      try {
-        // write to the file
-        long nrRemaining;
-        for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
-          int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining; 
-          out.write(buffer, 0, curSize);
-          reporter.setStatus("writing " + name + "@" + 
-                             (totalSize - nrRemaining) + "/" + totalSize 
-                             + " ::host = " + hostName);
-        }
-      } finally {
-        out.close();
+      OutputStream out = (OutputStream)this.stream;
+      // write to the file
+      long nrRemaining;
+      for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
+        int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining;
+        out.write(buffer, 0, curSize);
+        reporter.setStatus("writing " + name + "@" + 
+                           (totalSize - nrRemaining) + "/" + totalSize 
+                           + " ::host = " + hostName);
       }
       return Long.valueOf(totalSize);
     }
@@ -384,29 +489,35 @@ public class TestDFSIO extends TestCase 
   /**
    * Read mapper class.
    */
-  public static class ReadMapper extends IOStatMapper<Long> {
+  public static class ReadMapper extends IOStatMapper {
 
     public ReadMapper() { 
     }
 
+    @Override // IOMapperBase
+    public Closeable getIOStream(String name) throws IOException {
+      // open file
+      InputStream in = fs.open(new Path(getDataDir(getConf()), name));
+      if(compressionCodec != null)
+        in = compressionCodec.createInputStream(in);
+      LOG.info("in = " + in.getClass().getName());
+      return in;
+    }
+
+    @Override // IOMapperBase
     public Long doIO(Reporter reporter, 
                        String name, 
                        long totalSize // in bytes
                      ) throws IOException {
-      // open file
-      DataInputStream in = fs.open(new Path(getDataDir(getConf()), name));
+      InputStream in = (InputStream)this.stream;
       long actualSize = 0;
-      try {
-        while (actualSize < totalSize) {
-          int curSize = in.read(buffer, 0, bufferSize);
-          if(curSize < 0) break;
-          actualSize += curSize;
-          reporter.setStatus("reading " + name + "@" + 
-                             actualSize + "/" + totalSize 
-                             + " ::host = " + hostName);
-        }
-      } finally {
-        in.close();
+      while (actualSize < totalSize) {
+        int curSize = in.read(buffer, 0, bufferSize);
+        if(curSize < 0) break;
+        actualSize += curSize;
+        reporter.setStatus("reading " + name + "@" + 
+                           actualSize + "/" + totalSize 
+                           + " ::host = " + hostName);
       }
       return Long.valueOf(actualSize);
     }
@@ -418,20 +529,111 @@ public class TestDFSIO extends TestCase 
     runIOTest(ReadMapper.class, readDir);
   }
 
+  /**
+   * Mapper class for random reads.
+   * The mapper chooses a position in the file and reads bufferSize
+   * bytes starting at the chosen position.
+   * It stops after reading the totalSize bytes, specified by -size.
+   * 
+   * There are three type of reads.
+   * 1) Random read always chooses a random position to read from: skipSize = 0
+   * 2) Backward read reads file in reverse order                : skipSize < 0
+   * 3) Skip-read skips skipSize bytes after every read          : skipSize > 0
+   */
+  public static class RandomReadMapper extends IOStatMapper {
+    private Random rnd;
+    private long fileSize;
+    private long skipSize;
+
+    @Override // Mapper
+    public void configure(JobConf conf) {
+      super.configure(conf);
+      skipSize = conf.getLong("test.io.skip.size", 0);
+    }
+
+    public RandomReadMapper() { 
+      rnd = new Random();
+    }
+
+    @Override // IOMapperBase
+    public Closeable getIOStream(String name) throws IOException {
+      Path filePath = new Path(getDataDir(getConf()), name);
+      this.fileSize = fs.getFileStatus(filePath).getLen();
+      InputStream in = fs.open(filePath);
+      if(compressionCodec != null)
+        in = new FSDataInputStream(compressionCodec.createInputStream(in));
+      LOG.info("in = " + in.getClass().getName());
+      LOG.info("skipSize = " + skipSize);
+      return in;
+    }
+
+    @Override // IOMapperBase
+    public Long doIO(Reporter reporter, 
+                       String name, 
+                       long totalSize // in bytes
+                     ) throws IOException {
+      PositionedReadable in = (PositionedReadable)this.stream;
+      long actualSize = 0;
+      for(long pos = nextOffset(-1);
+          actualSize < totalSize; pos = nextOffset(pos)) {
+        int curSize = in.read(pos, buffer, 0, bufferSize);
+        if(curSize < 0) break;
+        actualSize += curSize;
+        reporter.setStatus("reading " + name + "@" + 
+                           actualSize + "/" + totalSize 
+                           + " ::host = " + hostName);
+      }
+      return Long.valueOf(actualSize);
+    }
+
+    /**
+     * Get next offset for reading.
+     * If current < 0 then choose initial offset according to the read type.
+     * 
+     * @param current offset
+     * @return
+     */
+    private long nextOffset(long current) {
+      if(skipSize == 0)
+        return rnd.nextInt((int)(fileSize));
+      if(skipSize > 0)
+        return (current < 0) ? 0 : (current + bufferSize + skipSize);
+      // skipSize < 0
+      return (current < 0) ? Math.max(0, fileSize - bufferSize) :
+                             Math.max(0, current + skipSize);
+    }
+  }
+
+  private void randomReadTest(FileSystem fs) throws IOException {
+    Path readDir = getRandomReadDir(config);
+    fs.delete(readDir, true);
+    runIOTest(RandomReadMapper.class, readDir);
+  }
+
   private void sequentialTest(FileSystem fs, 
-                              int testType, 
+                              TestType testType, 
                               long fileSize, // in bytes
                               int nrFiles
                              ) throws IOException {
-    IOStatMapper<Long> ioer = null;
-    if (testType == TEST_TYPE_READ)
+    IOStatMapper ioer = null;
+    switch(testType) {
+    case TEST_TYPE_READ:
       ioer = new ReadMapper();
-    else if (testType == TEST_TYPE_WRITE)
+      break;
+    case TEST_TYPE_WRITE:
       ioer = new WriteMapper();
-    else if (testType == TEST_TYPE_APPEND)
+      break;
+    case TEST_TYPE_APPEND:
       ioer = new AppendMapper();
-    else
+      break;
+    case TEST_TYPE_READ_RANDOM:
+    case TEST_TYPE_READ_BACKWARD:
+    case TEST_TYPE_READ_SKIP:
+      ioer = new RandomReadMapper();
+      break;
+    default:
       return;
+    }
     for(int i=0; i < nrFiles; i++)
       ioer.doIO(Reporter.NULL,
                 BASE_FILE_NAME+Integer.toString(i), 
@@ -454,13 +656,15 @@ public class TestDFSIO extends TestCase 
 
   @Override // Tool
   public int run(String[] args) throws IOException {
-    int testType = TEST_TYPE_READ;
+    TestType testType = null;
     int bufferSize = DEFAULT_BUFFER_SIZE;
-    long fileSize = 1*MEGA;
+    long nrBytes = 1*MEGA;
     int nrFiles = 1;
+    long skipSize = 0;
     String resFileName = DEFAULT_RES_FILE_NAME;
+    String compressionClass = null;
     boolean isSequential = false;
-    String version = TestDFSIO.class.getSimpleName() + ".0.0.6";
+    String version = TestDFSIO.class.getSimpleName() + ".1.7";
 
     LOG.info(version);
     if (args.length == 0) {
@@ -470,19 +674,32 @@ public class TestDFSIO extends TestCase 
 
     for (int i = 0; i < args.length; i++) {       // parse command line
       if (args[i].startsWith("-read")) {
-        testType = TEST_TYPE_READ;
+        testType = TestType.TEST_TYPE_READ;
       } else if (args[i].equals("-write")) {
-        testType = TEST_TYPE_WRITE;
+        testType = TestType.TEST_TYPE_WRITE;
       } else if (args[i].equals("-append")) {
-        testType = TEST_TYPE_APPEND;
+        testType = TestType.TEST_TYPE_APPEND;
+      } else if (args[i].equals("-random")) {
+        if(testType != TestType.TEST_TYPE_READ) return -1;
+        testType = TestType.TEST_TYPE_READ_RANDOM;
+      } else if (args[i].equals("-backward")) {
+        if(testType != TestType.TEST_TYPE_READ) return -1;
+        testType = TestType.TEST_TYPE_READ_BACKWARD;
+      } else if (args[i].equals("-skip")) {
+        if(testType != TestType.TEST_TYPE_READ) return -1;
+        testType = TestType.TEST_TYPE_READ_SKIP;
       } else if (args[i].equals("-clean")) {
-        testType = TEST_TYPE_CLEANUP;
+        testType = TestType.TEST_TYPE_CLEANUP;
       } else if (args[i].startsWith("-seq")) {
         isSequential = true;
+      } else if (args[i].startsWith("-compression")) {
+        compressionClass = args[++i];
       } else if (args[i].equals("-nrFiles")) {
         nrFiles = Integer.parseInt(args[++i]);
-      } else if (args[i].equals("-fileSize")) {
-        fileSize = parseSize(args[++i]);
+      } else if (args[i].equals("-fileSize") || args[i].equals("-size")) {
+        nrBytes = parseSize(args[++i]);
+      } else if (args[i].equals("-skipSize")) {
+        skipSize = parseSize(args[++i]);
       } else if (args[i].equals("-bufferSize")) {
         bufferSize = Integer.parseInt(args[++i]);
       } else if (args[i].equals("-resFile")) {
@@ -492,36 +709,59 @@ public class TestDFSIO extends TestCase 
         return -1;
       }
     }
+    if(testType == null)
+      return -1;
+    if(testType == TestType.TEST_TYPE_READ_BACKWARD)
+      skipSize = -bufferSize;
+    else if(testType == TestType.TEST_TYPE_READ_SKIP && skipSize == 0)
+      skipSize = bufferSize;
 
     LOG.info("nrFiles = " + nrFiles);
-    LOG.info("fileSize (MB) = " + toMB(fileSize));
+    LOG.info("nrBytes (MB) = " + toMB(nrBytes));
     LOG.info("bufferSize = " + bufferSize);
+    if(skipSize > 0)
+      LOG.info("skipSize = " + skipSize);
     LOG.info("baseDir = " + getBaseDir(config));
+    
+    if(compressionClass != null) {
+      config.set("test.io.compression.class", compressionClass);
+      LOG.info("compressionClass = " + compressionClass);
+    }
 
     config.setInt("test.io.file.buffer.size", bufferSize);
+    config.setLong("test.io.skip.size", skipSize);
     config.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
     FileSystem fs = FileSystem.get(config);
 
     if (isSequential) {
       long tStart = System.currentTimeMillis();
-      sequentialTest(fs, testType, fileSize, nrFiles);
+      sequentialTest(fs, testType, nrBytes, nrFiles);
       long execTime = System.currentTimeMillis() - tStart;
       String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000;
       LOG.info(resultLine);
       return 0;
     }
-    if (testType == TEST_TYPE_CLEANUP) {
+    if (testType == TestType.TEST_TYPE_CLEANUP) {
       cleanup(fs);
       return 0;
     }
-    createControlFile(fs, fileSize, nrFiles);
+    createControlFile(fs, nrBytes, nrFiles);
     long tStart = System.currentTimeMillis();
-    if (testType == TEST_TYPE_WRITE)
+    switch(testType) {
+    case TEST_TYPE_WRITE:
       writeTest(fs);
-    if (testType == TEST_TYPE_READ)
+      break;
+    case TEST_TYPE_READ:
       readTest(fs);
-    if (testType == TEST_TYPE_APPEND)
+      break;
+    case TEST_TYPE_APPEND:
       appendTest(fs);
+      break;
+    case TEST_TYPE_READ_RANDOM:
+    case TEST_TYPE_READ_BACKWARD:
+    case TEST_TYPE_READ_SKIP:
+      randomReadTest(fs);
+    }
     long execTime = System.currentTimeMillis() - tStart;
   
     analyzeResult(fs, testType, execTime, resFileName);
@@ -547,9 +787,9 @@ public class TestDFSIO extends TestCase 
   static long parseSize(String arg) {
     String[] args = arg.split("\\D", 2);  // get digits
     assert args.length <= 2;
-    long fileSize = Long.parseLong(args[0]);
+    long nrBytes = Long.parseLong(args[0]);
     String bytesMult = arg.substring(args[0].length()); // get byte multiple
-    return fileSize * ByteMultiple.parseString(bytesMult).value();
+    return nrBytes * ByteMultiple.parseString(bytesMult).value();
   }
 
   static float toMB(long bytes) {
@@ -557,17 +797,11 @@ public class TestDFSIO extends TestCase 
   }
 
   private void analyzeResult(	FileSystem fs,
-                              int testType,
+                              TestType testType,
                               long execTime,
                               String resFileName
                             ) throws IOException {
-    Path reduceFile;
-    if (testType == TEST_TYPE_WRITE)
-      reduceFile = new Path(getWriteDir(config), "part-00000");
-    else if (testType == TEST_TYPE_APPEND)
-      reduceFile = new Path(getAppendDir(config), "part-00000");
-    else // if (testType == TEST_TYPE_READ)
-      reduceFile = new Path(getReadDir(config), "part-00000");
+    Path reduceFile = getReduceFilePath(testType);
     long tasks = 0;
     long size = 0;
     long time = 0;
@@ -601,10 +835,7 @@ public class TestDFSIO extends TestCase 
     double med = rate / 1000 / tasks;
     double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med));
     String resultLines[] = {
-      "----- TestDFSIO ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" :
-                                    (testType == TEST_TYPE_READ) ? "read" : 
-                                    (testType == TEST_TYPE_APPEND) ? "append" : 
-                                    "unknown"),
+      "----- TestDFSIO ----- : " + testType,
       "           Date & time: " + new Date(System.currentTimeMillis()),
       "       Number of files: " + tasks,
       "Total MBytes processed: " + toMB(size),
@@ -626,6 +857,27 @@ public class TestDFSIO extends TestCase 
     }
   }
 
+  private Path getReduceFilePath(TestType testType) {
+    switch(testType) {
+    case TEST_TYPE_WRITE:
+      return new Path(getWriteDir(config), "part-00000");
+    case TEST_TYPE_APPEND:
+      return new Path(getAppendDir(config), "part-00000");
+    case TEST_TYPE_READ:
+      return new Path(getReadDir(config), "part-00000");
+    case TEST_TYPE_READ_RANDOM:
+    case TEST_TYPE_READ_BACKWARD:
+    case TEST_TYPE_READ_SKIP:
+      return new Path(getRandomReadDir(config), "part-00000");
+    }
+    return null;
+  }
+
+  private void analyzeResult(FileSystem fs, TestType testType, long execTime)
+      throws IOException {
+    analyzeResult(fs, testType, execTime, DEFAULT_RES_FILE_NAME);
+  }
+
   private void cleanup(FileSystem fs)
   throws IOException {
     LOG.info("Cleaning up test files");

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/Operation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/Operation.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/Operation.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/Operation.java Fri Oct 19 02:25:55 2012
@@ -41,7 +41,9 @@ abstract class Operation {
     this.config = cfg;
     this.type = type;
     this.rnd = rnd;
-    this.finder = new PathFinder(cfg, rnd);
+    // Use a new Random instance so that the sequence of file names produced is
+    // the same even in case of unsuccessful operations
+    this.finder = new PathFinder(cfg, new Random(rnd.nextInt()));
   }
 
   /**

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/SliveMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/SliveMapper.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/SliveMapper.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/SliveMapper.java Fri Oct 19 02:25:55 2012
@@ -32,6 +32,8 @@ import org.apache.hadoop.mapred.MapReduc
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -50,8 +52,7 @@ public class SliveMapper extends MapRedu
 
   private FileSystem filesystem;
   private ConfigExtractor config;
-  private WeightSelector selector;
-  private Random rnd;
+  private int taskId;
 
   /*
    * (non-Javadoc)
@@ -70,19 +71,19 @@ public class SliveMapper extends MapRedu
     }
     try {
       config = new ConfigExtractor(conf);
-      Long rndSeed = config.getRandomSeed();
-      if (rndSeed != null) {
-        rnd = new Random(rndSeed);
-      } else {
-        rnd = new Random();
-      }
-      selector = new WeightSelector(config, rnd);
       ConfigExtractor.dumpOptions(config);
     } catch (Exception e) {
       LOG.error("Unable to setup slive " + StringUtils.stringifyException(e));
       throw new RuntimeException("Unable to setup slive configuration", e);
     }
-
+    if(conf.get(MRJobConfig.TASK_ATTEMPT_ID) != null ) {
+      this.taskId = TaskAttemptID.forName(conf.get(MRJobConfig.TASK_ATTEMPT_ID))
+        .getTaskID().getId();
+    } else {
+      // So that branch-1/0.20 can run this same code as well
+      this.taskId = TaskAttemptID.forName(conf.get("mapred.task.id"))
+          .getTaskID().getId();
+    }
   }
 
   /**
@@ -95,15 +96,6 @@ public class SliveMapper extends MapRedu
   }
 
   /**
-   * Gets the operation selector to use for this object
-   * 
-   * @return WeightSelector
-   */
-  private WeightSelector getSelector() {
-    return selector;
-  }
-
-  /**
    * Logs to the given reporter and logs to the internal logger at info level
    * 
    * @param r
@@ -154,6 +146,10 @@ public class SliveMapper extends MapRedu
       Reporter reporter) throws IOException {
     logAndSetStatus(reporter, "Running slive mapper for dummy key " + key
         + " and dummy value " + value);
+    //Add taskID to randomSeed to deterministically seed rnd.
+    Random rnd = config.getRandomSeed() != null ?
+      new Random(this.taskId + config.getRandomSeed()) : new Random();
+    WeightSelector selector = new WeightSelector(config, rnd);
     long startTime = Timer.now();
     long opAm = 0;
     long sleepOps = 0;
@@ -163,7 +159,6 @@ public class SliveMapper extends MapRedu
     if (sleepRange != null) {
       sleeper = new SleepOp(getConfig(), rnd);
     }
-    WeightSelector selector = getSelector();
     while (Timer.elapsed(startTime) < duration) {
       try {
         logAndSetStatus(reporter, "Attempting to select operation #"

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MRCaching.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MRCaching.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MRCaching.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MRCaching.java Fri Oct 19 02:25:55 2012
@@ -210,19 +210,10 @@ public class MRCaching {
     fs.copyFromLocalFile(tarPath1, cachePath);
     fs.copyFromLocalFile(tarPath2, cachePath);
   }
- 
-  public static TestResult launchMRCache(String indir,
-                                         String outdir, String cacheDir, 
-                                         JobConf conf, String input) 
-  throws IOException {
-    setupCache(cacheDir, FileSystem.get(conf));
-    return launchMRCache(indir,outdir, cacheDir, conf, input, false); 
-  }
   
   public static TestResult launchMRCache(String indir,
                                          String outdir, String cacheDir, 
-                                         JobConf conf, String input,
-                                         boolean withSymlink)
+                                         JobConf conf, String input)
     throws IOException {
     String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data","/tmp"))
       .toString().replace(' ', '+');
@@ -256,24 +247,13 @@ public class MRCaching {
     conf.setNumReduceTasks(1);
     conf.setSpeculativeExecution(false);
     URI[] uris = new URI[6];
-    if (!withSymlink) {
-      conf.setMapperClass(MRCaching.MapClass.class);
-      uris[0] = fs.getUri().resolve(cacheDir + "/test.txt");
-      uris[1] = fs.getUri().resolve(cacheDir + "/test.jar");
-      uris[2] = fs.getUri().resolve(cacheDir + "/test.zip");
-      uris[3] = fs.getUri().resolve(cacheDir + "/test.tgz");
-      uris[4] = fs.getUri().resolve(cacheDir + "/test.tar.gz");
-      uris[5] = fs.getUri().resolve(cacheDir + "/test.tar");
-    } else {
-      DistributedCache.createSymlink(conf);
-      conf.setMapperClass(MRCaching.MapClass2.class);
-      uris[0] = fs.getUri().resolve(cacheDir + "/test.txt#" + "test.txt");
-      uris[1] = fs.getUri().resolve(cacheDir + "/test.jar#" + "testjar");
-      uris[2] = fs.getUri().resolve(cacheDir + "/test.zip#" + "testzip");
-      uris[3] = fs.getUri().resolve(cacheDir + "/test.tgz#" + "testtgz");
-      uris[4] = fs.getUri().resolve(cacheDir + "/test.tar.gz#" + "testtargz");
-      uris[5] = fs.getUri().resolve(cacheDir + "/test.tar#" + "testtar");
-    }
+    conf.setMapperClass(MRCaching.MapClass2.class);
+    uris[0] = fs.getUri().resolve(cacheDir + "/test.txt");
+    uris[1] = fs.getUri().resolve(cacheDir + "/test.jar");
+    uris[2] = fs.getUri().resolve(cacheDir + "/test.zip");
+    uris[3] = fs.getUri().resolve(cacheDir + "/test.tgz");
+    uris[4] = fs.getUri().resolve(cacheDir + "/test.tar.gz");
+    uris[5] = fs.getUri().resolve(cacheDir + "/test.tar");
     DistributedCache.addCacheFile(uris[0], conf);
 
     // Save expected file sizes

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java Fri Oct 19 02:25:55 2012
@@ -58,8 +58,12 @@ public class MiniMRClientClusterFactory 
     Job job = Job.getInstance(conf);
 
     job.addFileToClassPath(appJar);
-    String callerJar = JarFinder.getJar(caller);
-    job.setJar(callerJar);
+
+    Path callerJar = new Path(JarFinder.getJar(caller));
+    Path remoteCallerJar = new Path(testRootDir, callerJar.getName());
+    fs.copyFromLocalFile(callerJar, remoteCallerJar);
+    fs.setPermission(remoteCallerJar, new FsPermission("744"));
+    job.addFileToClassPath(remoteCallerJar);
 
     MiniMRYarnCluster miniMRYarnCluster = new MiniMRYarnCluster(caller
         .getName(), noOfNMs);

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SortValidator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SortValidator.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SortValidator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SortValidator.java Fri Oct 19 02:25:55 2012
@@ -33,7 +33,6 @@ import org.apache.hadoop.io.WritableComp
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapred.lib.HashPartitioner;
-import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.fs.*;
@@ -345,7 +344,8 @@ public class SortValidator extends Confi
 
       FileInputFormat.setInputPaths(jobConf, sortInput);
       FileInputFormat.addInputPath(jobConf, sortOutput);
-      Path outputPath = new Path("/tmp/sortvalidate/recordstatschecker");
+      Path outputPath = new Path(new Path("/tmp",
+           "sortvalidate"), UUID.randomUUID().toString());
       if (defaultfs.exists(outputPath)) {
         defaultfs.delete(outputPath, true);
       }
@@ -365,31 +365,44 @@ public class SortValidator extends Confi
       Date startTime = new Date();
       System.out.println("Job started: " + startTime);
       JobClient.runJob(jobConf);
-      Date end_time = new Date();
-      System.out.println("Job ended: " + end_time);
-      System.out.println("The job took " + 
-                         (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
-      
-      // Check to ensure that the statistics of the 
-      // framework's sort-input and sort-output match
-      SequenceFile.Reader stats = new SequenceFile.Reader(defaultfs,
-                                                          new Path(outputPath, "part-00000"), defaults);
-      IntWritable k1 = new IntWritable();
-      IntWritable k2 = new IntWritable();
-      RecordStatsWritable v1 = new RecordStatsWritable();
-      RecordStatsWritable v2 = new RecordStatsWritable();
-      if (!stats.next(k1, v1)) {
-        throw new IOException("Failed to read record #1 from reduce's output");
-      }
-      if (!stats.next(k2, v2)) {
-        throw new IOException("Failed to read record #2 from reduce's output");
-      }
-
-      if ((v1.getBytes() != v2.getBytes()) || (v1.getRecords() != v2.getRecords()) || 
-          v1.getChecksum() != v2.getChecksum()) {
-        throw new IOException("(" + 
-                              v1.getBytes() + ", " + v1.getRecords() + ", " + v1.getChecksum() + ") v/s (" +
-                              v2.getBytes() + ", " + v2.getRecords() + ", " + v2.getChecksum() + ")");
+      try {
+        Date end_time = new Date();
+        System.out.println("Job ended: " + end_time);
+        System.out.println("The job took " + 
+            (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
+
+        // Check to ensure that the statistics of the 
+        // framework's sort-input and sort-output match
+        SequenceFile.Reader stats = new SequenceFile.Reader(defaultfs,
+            new Path(outputPath, "part-00000"), defaults);
+        try {
+          IntWritable k1 = new IntWritable();
+          IntWritable k2 = new IntWritable();
+          RecordStatsWritable v1 = new RecordStatsWritable();
+          RecordStatsWritable v2 = new RecordStatsWritable();
+          if (!stats.next(k1, v1)) {
+            throw new IOException(
+                "Failed to read record #1 from reduce's output");
+          }
+          if (!stats.next(k2, v2)) {
+            throw new IOException(
+                "Failed to read record #2 from reduce's output");
+          }
+
+          if ((v1.getBytes() != v2.getBytes()) || 
+              (v1.getRecords() != v2.getRecords()) || 
+              v1.getChecksum() != v2.getChecksum()) {
+            throw new IOException("(" + 
+                v1.getBytes() + ", " + v1.getRecords() + ", " + v1.getChecksum()
+                + ") v/s (" +
+                v2.getBytes() + ", " + v2.getRecords() + ", " + v2.getChecksum()
+                + ")");
+          }
+        } finally {
+          stats.close();
+        }
+      } finally {
+        defaultfs.delete(outputPath, true);
       }
     }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java Fri Oct 19 02:25:55 2012
@@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -218,7 +219,8 @@ public class TestClientServiceDelegate {
     GetJobReportResponse jobReportResponse1 = mock(GetJobReportResponse.class);
     when(jobReportResponse1.getJobReport()).thenReturn(
         MRBuilderUtils.newJobReport(jobId, "jobName-firstGen", "user",
-            JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null, false));
+            JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null,
+            false, ""));
 
     // First AM returns a report with jobName firstGen and simulates AM shutdown
     // on second invocation.
@@ -230,7 +232,8 @@ public class TestClientServiceDelegate {
     GetJobReportResponse jobReportResponse2 = mock(GetJobReportResponse.class);
     when(jobReportResponse2.getJobReport()).thenReturn(
         MRBuilderUtils.newJobReport(jobId, "jobName-secondGen", "user",
-            JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null, false));
+            JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null,
+            false, ""));
 
     // Second AM generation returns a report with jobName secondGen
     MRClientProtocol secondGenAMProxy = mock(MRClientProtocol.class);
@@ -404,17 +407,23 @@ public class TestClientServiceDelegate {
   }
 
   private ApplicationReport getFinishedApplicationReport() {
-    return BuilderUtils.newApplicationReport(BuilderUtils.newApplicationId(
-        1234, 5), "user", "queue", "appname", "host", 124, null,
-        YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0,
-        FinalApplicationStatus.SUCCEEDED, null, "N/A");
+    ApplicationId appId = BuilderUtils.newApplicationId(1234, 5);
+    ApplicationAttemptId attemptId = BuilderUtils.newApplicationAttemptId(
+        appId, 0);
+    return BuilderUtils.newApplicationReport(appId, attemptId, "user", "queue",
+        "appname", "host", 124, null, YarnApplicationState.FINISHED,
+        "diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null,
+        "N/A");
   }
 
   private ApplicationReport getRunningApplicationReport(String host, int port) {
-    return BuilderUtils.newApplicationReport(BuilderUtils.newApplicationId(
-        1234, 5), "user", "queue", "appname", host, port, null,
-        YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0,
-        FinalApplicationStatus.UNDEFINED, null, "N/A");
+    ApplicationId appId = BuilderUtils.newApplicationId(1234, 5);
+    ApplicationAttemptId attemptId = BuilderUtils.newApplicationAttemptId(
+        appId, 0);
+    return BuilderUtils.newApplicationReport(appId, attemptId, "user", "queue",
+        "appname", host, port, null, YarnApplicationState.RUNNING,
+        "diagnostics", "url", 0, 0, FinalApplicationStatus.UNDEFINED, null,
+        "N/A");
   }
 
   private ResourceMgrDelegate getRMDelegate() throws YarnRemoteException {

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java Fri Oct 19 02:25:55 2012
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.when;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.concurrent.TimeoutException;
 
 import junit.framework.TestCase;
 
@@ -95,7 +96,7 @@ public class TestFileInputFormat extends
   }
 
   private void createInputs(FileSystem fs, Path inDir, String fileName)
-  throws IOException {
+      throws IOException, TimeoutException, InterruptedException {
     // create a multi-block file on hdfs
     Path path = new Path(inDir, fileName);
     final short replication = 2;
@@ -157,7 +158,7 @@ public class TestFileInputFormat extends
     }
   }
 
-  public void testMultiLevelInput() throws IOException {
+  public void testMultiLevelInput() throws Exception {
     JobConf job = new JobConf(conf);
 
     job.setBoolean("dfs.replication.considerLoad", false);
@@ -291,7 +292,8 @@ public class TestFileInputFormat extends
   }
 
   static void writeFile(Configuration conf, Path name,
-      short replication, int numBlocks) throws IOException {
+      short replication, int numBlocks)
+      throws IOException, TimeoutException, InterruptedException {
     FileSystem fileSys = FileSystem.get(conf);
 
     FSDataOutputStream stm = fileSys.create(name, true,

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFileStreams.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFileStreams.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFileStreams.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFileStreams.java Fri Oct 19 02:25:55 2012
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.mapred;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -35,7 +36,7 @@ public class TestIFileStreams extends Te
     ifos.close();
     DataInputBuffer dib = new DataInputBuffer();
     dib.reset(dob.getData(), DLEN + 4);
-    IFileInputStream ifis = new IFileInputStream(dib, 104);
+    IFileInputStream ifis = new IFileInputStream(dib, 104, new Configuration());
     for (int i = 0; i < DLEN; ++i) {
       assertEquals(i, ifis.read());
     }
@@ -54,7 +55,7 @@ public class TestIFileStreams extends Te
     final byte[] b = dob.getData();
     ++b[17];
     dib.reset(b, DLEN + 4);
-    IFileInputStream ifis = new IFileInputStream(dib, 104);
+    IFileInputStream ifis = new IFileInputStream(dib, 104, new Configuration());
     int i = 0;
     try {
       while (i < DLEN) {
@@ -83,7 +84,7 @@ public class TestIFileStreams extends Te
     ifos.close();
     DataInputBuffer dib = new DataInputBuffer();
     dib.reset(dob.getData(), DLEN + 4);
-    IFileInputStream ifis = new IFileInputStream(dib, 100);
+    IFileInputStream ifis = new IFileInputStream(dib, 100, new Configuration());
     int i = 0;
     try {
       while (i < DLEN - 8) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java Fri Oct 19 02:25:55 2012
@@ -48,7 +48,7 @@ public class TestMiniMRDFSCaching extend
                                             "/cachedir",
                                             mr.createJobConf(),
                                             "The quick brown fox\nhas many silly\n"
-                                            + "red fox sox\n", false);
+                                            + "red fox sox\n");
       assertTrue("Archives not matching", ret.isOutputOk);
       // launch MR cache with symlinks
       ret = MRCaching.launchMRCache("/testing/wc/input",
@@ -56,7 +56,7 @@ public class TestMiniMRDFSCaching extend
                                     "/cachedir",
                                     mr.createJobConf(),
                                     "The quick brown fox\nhas many silly\n"
-                                    + "red fox sox\n", true);
+                                    + "red fox sox\n");
       assertTrue("Archives not matching", ret.isOutputOk);
     } finally {
       if (fileSys != null) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMultipleLevelCaching.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMultipleLevelCaching.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMultipleLevelCaching.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMultipleLevelCaching.java Fri Oct 19 02:25:55 2012
@@ -71,13 +71,13 @@ public class TestMultipleLevelCaching ex
     return rack.toString();
   }
 
-  public void testMultiLevelCaching() throws IOException {
+  public void testMultiLevelCaching() throws Exception {
     for (int i = 1 ; i <= MAX_LEVEL; ++i) {
       testCachingAtLevel(i);
     }
   }
 
-  private void testCachingAtLevel(int level) throws IOException {
+  private void testCachingAtLevel(int level) throws Exception {
     String namenode = null;
     MiniDFSCluster dfs = null;
     MiniMRCluster mr = null;

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestResourceMgrDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestResourceMgrDelegate.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestResourceMgrDelegate.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestResourceMgrDelegate.java Fri Oct 19 02:25:55 2012
@@ -50,7 +50,7 @@ public class TestResourceMgrDelegate {
    */
   @Test
   public void testGetRootQueues() throws IOException, InterruptedException {
-    ClientRMProtocol applicationsManager = Mockito.mock(ClientRMProtocol.class);
+    final ClientRMProtocol applicationsManager = Mockito.mock(ClientRMProtocol.class);
     GetQueueInfoResponse response = Mockito.mock(GetQueueInfoResponse.class);
     org.apache.hadoop.yarn.api.records.QueueInfo queueInfo =
       Mockito.mock(org.apache.hadoop.yarn.api.records.QueueInfo.class);
@@ -59,12 +59,17 @@ public class TestResourceMgrDelegate {
       GetQueueInfoRequest.class))).thenReturn(response);
 
     ResourceMgrDelegate delegate = new ResourceMgrDelegate(
-      new YarnConfiguration(), applicationsManager);
+      new YarnConfiguration()) {
+      @Override
+      public synchronized void start() {
+        this.rmClient = applicationsManager;
+      }
+    };
     delegate.getRootQueues();
 
     ArgumentCaptor<GetQueueInfoRequest> argument =
       ArgumentCaptor.forClass(GetQueueInfoRequest.class);
-    Mockito.verify(delegate.applicationsManager).getQueueInfo(
+    Mockito.verify(applicationsManager).getQueueInfo(
       argument.capture());
 
     Assert.assertTrue("Children of root queue not requested",
@@ -75,7 +80,7 @@ public class TestResourceMgrDelegate {
 
   @Test
   public void tesAllJobs() throws Exception {
-    ClientRMProtocol applicationsManager = Mockito.mock(ClientRMProtocol.class);
+    final ClientRMProtocol applicationsManager = Mockito.mock(ClientRMProtocol.class);
     GetAllApplicationsResponse allApplicationsResponse = Records
         .newRecord(GetAllApplicationsResponse.class);
     List<ApplicationReport> applications = new ArrayList<ApplicationReport>();
@@ -93,7 +98,12 @@ public class TestResourceMgrDelegate {
             .any(GetAllApplicationsRequest.class))).thenReturn(
         allApplicationsResponse);
     ResourceMgrDelegate resourceMgrDelegate = new ResourceMgrDelegate(
-        new YarnConfiguration(), applicationsManager);
+      new YarnConfiguration()) {
+      @Override
+      public synchronized void start() {
+        this.rmClient = applicationsManager;
+      }
+    };
     JobStatus[] allJobs = resourceMgrDelegate.getAllJobs();
 
     Assert.assertEquals(State.FAILED, allJobs[0].getState());

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java Fri Oct 19 02:25:55 2012
@@ -31,6 +31,7 @@ import java.util.Enumeration;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -449,11 +450,14 @@ public class UtilsForTests {
   static void signalTasks(MiniDFSCluster dfs, FileSystem fileSys, 
                           String mapSignalFile, 
                           String reduceSignalFile, int replication) 
-  throws IOException {
-    writeFile(dfs.getNameNode(), fileSys.getConf(), new Path(mapSignalFile), 
-              (short)replication);
-    writeFile(dfs.getNameNode(), fileSys.getConf(), new Path(reduceSignalFile), 
-              (short)replication);
+      throws IOException, TimeoutException {
+    try {
+      writeFile(dfs.getNameNode(), fileSys.getConf(), new Path(mapSignalFile), 
+                (short)replication);
+      writeFile(dfs.getNameNode(), fileSys.getConf(), new Path(reduceSignalFile), (short)replication);
+    } catch (InterruptedException ie) {
+      // Ignore
+    }
   }
   
   /**
@@ -462,12 +466,16 @@ public class UtilsForTests {
   static void signalTasks(MiniDFSCluster dfs, FileSystem fileSys, 
                           boolean isMap, String mapSignalFile, 
                           String reduceSignalFile)
-  throws IOException {
-    //  signal the maps to complete
-    writeFile(dfs.getNameNode(), fileSys.getConf(),
-              isMap 
-              ? new Path(mapSignalFile)
-              : new Path(reduceSignalFile), (short)1);
+      throws IOException, TimeoutException {
+    try {
+      //  signal the maps to complete
+      writeFile(dfs.getNameNode(), fileSys.getConf(),
+                isMap 
+                ? new Path(mapSignalFile)
+                : new Path(reduceSignalFile), (short)1);
+    } catch (InterruptedException ie) {
+      // Ignore
+    }
   }
   
   static String getSignalFile(Path dir) {
@@ -483,7 +491,8 @@ public class UtilsForTests {
   }
   
   static void writeFile(NameNode namenode, Configuration conf, Path name, 
-      short replication) throws IOException {
+                        short replication)
+      throws IOException, TimeoutException, InterruptedException {
     FileSystem fileSys = FileSystem.get(conf);
     SequenceFile.Writer writer = 
       SequenceFile.createWriter(fileSys, conf, name, 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestClientProtocolProviderImpls.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestClientProtocolProviderImpls.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestClientProtocolProviderImpls.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestClientProtocolProviderImpls.java Fri Oct 19 02:25:55 2012
@@ -1,120 +1,120 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapreduce;
-
-import java.io.IOException;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.LocalJobRunner;
-import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
-import org.junit.Test;
-
-public class TestClientProtocolProviderImpls extends TestCase {
-
-  @Test
-  public void testClusterWithLocalClientProvider() throws Exception {
-
-    Configuration conf = new Configuration();
-
-    try {
-      conf.set(MRConfig.FRAMEWORK_NAME, "incorrect");
-      new Cluster(conf);
-      fail("Cluster should not be initialized with incorrect framework name");
-    } catch (IOException e) {
-
-    }
-
-    try {
-      conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
-      conf.set(JTConfig.JT_IPC_ADDRESS, "127.0.0.1:0");
-
-      new Cluster(conf);
-      fail("Cluster with Local Framework name should use local JT address");
-    } catch (IOException e) {
-
-    }
-
-    try {
-      conf.set(JTConfig.JT_IPC_ADDRESS, "local");
-      Cluster cluster = new Cluster(conf);
-      assertTrue(cluster.getClient() instanceof LocalJobRunner);
-      cluster.close();
-    } catch (IOException e) {
-
-    }
-  }
-
-  @Test
-  public void testClusterWithJTClientProvider() throws Exception {
-
-    Configuration conf = new Configuration();
-    try {
-      conf.set(MRConfig.FRAMEWORK_NAME, "incorrect");
-      new Cluster(conf);
-      fail("Cluster should not be initialized with incorrect framework name");
-
-    } catch (IOException e) {
-
-    }
-
-    try {
-      conf.set(MRConfig.FRAMEWORK_NAME, "classic");
-      conf.set(JTConfig.JT_IPC_ADDRESS, "local");
-      new Cluster(conf);
-      fail("Cluster with classic Framework name shouldnot use local JT address");
-
-    } catch (IOException e) {
-
-    }
-
-    try {
-      conf = new Configuration();
-      conf.set(MRConfig.FRAMEWORK_NAME, "classic");
-      conf.set(JTConfig.JT_IPC_ADDRESS, "127.0.0.1:0");
-      Cluster cluster = new Cluster(conf);
-      cluster.close();
-    } catch (IOException e) {
-
-    }
-  }
-
-  @Test
-  public void testClusterException() {
-
-    Configuration conf = new Configuration();
-    conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME);
-    conf.set(JTConfig.JT_IPC_ADDRESS, "local");
-
-    // initializing a cluster with this conf should throw an error.
-    // However the exception thrown should not be specific to either
-    // the job tracker client provider or the local provider
-    boolean errorThrown = false;
-    try {
-      Cluster cluster = new Cluster(conf);
-      cluster.close();
-      fail("Not expected - cluster init should have failed");
-    } catch (IOException e) {
-      errorThrown = true;
-      assert(e.getMessage().contains("Cannot initialize Cluster. Please check"));
-    }
-    assert(errorThrown);
-  }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.LocalJobRunner;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.junit.Test;
+
+public class TestClientProtocolProviderImpls extends TestCase {
+
+  @Test
+  public void testClusterWithLocalClientProvider() throws Exception {
+
+    Configuration conf = new Configuration();
+
+    try {
+      conf.set(MRConfig.FRAMEWORK_NAME, "incorrect");
+      new Cluster(conf);
+      fail("Cluster should not be initialized with incorrect framework name");
+    } catch (IOException e) {
+
+    }
+
+    try {
+      conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
+      conf.set(JTConfig.JT_IPC_ADDRESS, "127.0.0.1:0");
+
+      new Cluster(conf);
+      fail("Cluster with Local Framework name should use local JT address");
+    } catch (IOException e) {
+
+    }
+
+    try {
+      conf.set(JTConfig.JT_IPC_ADDRESS, "local");
+      Cluster cluster = new Cluster(conf);
+      assertTrue(cluster.getClient() instanceof LocalJobRunner);
+      cluster.close();
+    } catch (IOException e) {
+
+    }
+  }
+
+  @Test
+  public void testClusterWithJTClientProvider() throws Exception {
+
+    Configuration conf = new Configuration();
+    try {
+      conf.set(MRConfig.FRAMEWORK_NAME, "incorrect");
+      new Cluster(conf);
+      fail("Cluster should not be initialized with incorrect framework name");
+
+    } catch (IOException e) {
+
+    }
+
+    try {
+      conf.set(MRConfig.FRAMEWORK_NAME, "classic");
+      conf.set(JTConfig.JT_IPC_ADDRESS, "local");
+      new Cluster(conf);
+      fail("Cluster with classic Framework name shouldnot use local JT address");
+
+    } catch (IOException e) {
+
+    }
+
+    try {
+      conf = new Configuration();
+      conf.set(MRConfig.FRAMEWORK_NAME, "classic");
+      conf.set(JTConfig.JT_IPC_ADDRESS, "127.0.0.1:0");
+      Cluster cluster = new Cluster(conf);
+      cluster.close();
+    } catch (IOException e) {
+
+    }
+  }
+
+  @Test
+  public void testClusterException() {
+
+    Configuration conf = new Configuration();
+    conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME);
+    conf.set(JTConfig.JT_IPC_ADDRESS, "local");
+
+    // initializing a cluster with this conf should throw an error.
+    // However the exception thrown should not be specific to either
+    // the job tracker client provider or the local provider
+    boolean errorThrown = false;
+    try {
+      Cluster cluster = new Cluster(conf);
+      cluster.close();
+      fail("Not expected - cluster init should have failed");
+    } catch (IOException e) {
+      errorThrown = true;
+      assert(e.getMessage().contains("Cannot initialize Cluster. Please check"));
+    }
+    assert(errorThrown);
+  }
+}

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java Fri Oct 19 02:25:55 2012
@@ -1,122 +1,129 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapreduce;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.when;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.LocalJobRunner;
-import org.apache.hadoop.mapred.ResourceMgrDelegate;
-import org.apache.hadoop.mapred.YARNRunner;
-import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
-import org.apache.hadoop.yarn.api.records.DelegationToken;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.junit.Test;
-
-public class TestYarnClientProtocolProvider extends TestCase {
-  
-  private static final RecordFactory recordFactory = RecordFactoryProvider.
-      getRecordFactory(null);
-  
-  @Test
-  public void testClusterWithYarnClientProtocolProvider() throws Exception {
-
-    Configuration conf = new Configuration(false);
-    Cluster cluster = null;
-
-    try {
-      cluster = new Cluster(conf);
-    } catch (Exception e) {
-      throw new Exception(
-          "Failed to initialize a local runner w/o a cluster framework key", e);
-    }
-    
-    try {
-      assertTrue("client is not a LocalJobRunner",
-          cluster.getClient() instanceof LocalJobRunner);
-    } finally {
-      if (cluster != null) {
-        cluster.close();
-      }
-    }
-    
-    try {
-      conf = new Configuration();
-      conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
-      cluster = new Cluster(conf);
-      ClientProtocol client = cluster.getClient();
-      assertTrue("client is a YARNRunner", client instanceof YARNRunner);
-    } catch (IOException e) {
-
-    } finally {
-      if (cluster != null) {
-        cluster.close();
-      }
-    }
-  }
-
- 
-  @Test
-  public void testClusterGetDelegationToken() throws Exception {
-
-    Configuration conf = new Configuration(false);
-    Cluster cluster = null;
-    try {
-      conf = new Configuration();
-      conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
-      cluster = new Cluster(conf);
-      YARNRunner yrunner = (YARNRunner) cluster.getClient();
-      GetDelegationTokenResponse getDTResponse = 
-          recordFactory.newRecordInstance(GetDelegationTokenResponse.class);
-      DelegationToken rmDTToken = recordFactory.newRecordInstance(
-          DelegationToken.class);
-      rmDTToken.setIdentifier(ByteBuffer.wrap(new byte[2]));
-      rmDTToken.setKind("Testclusterkind");
-      rmDTToken.setPassword(ByteBuffer.wrap("testcluster".getBytes()));
-      rmDTToken.setService("0.0.0.0:8032");
-      getDTResponse.setRMDelegationToken(rmDTToken);
-      ClientRMProtocol cRMProtocol = mock(ClientRMProtocol.class);
-      when(cRMProtocol.getDelegationToken(any(
-          GetDelegationTokenRequest.class))).thenReturn(getDTResponse);
-      ResourceMgrDelegate rmgrDelegate = new ResourceMgrDelegate(
-          new YarnConfiguration(conf), cRMProtocol);
-      yrunner.setResourceMgrDelegate(rmgrDelegate);
-      Token t = cluster.getDelegationToken(new Text(" "));
-      assertTrue("Testclusterkind".equals(t.getKind().toString()));
-    } finally {
-      if (cluster != null) {
-        cluster.close();
-      }
-    }
-  }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.LocalJobRunner;
+import org.apache.hadoop.mapred.ResourceMgrDelegate;
+import org.apache.hadoop.mapred.YARNRunner;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.records.DelegationToken;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.junit.Test;
+
+public class TestYarnClientProtocolProvider extends TestCase {
+  
+  private static final RecordFactory recordFactory = RecordFactoryProvider.
+      getRecordFactory(null);
+  
+  @Test
+  public void testClusterWithYarnClientProtocolProvider() throws Exception {
+
+    Configuration conf = new Configuration(false);
+    Cluster cluster = null;
+
+    try {
+      cluster = new Cluster(conf);
+    } catch (Exception e) {
+      throw new Exception(
+          "Failed to initialize a local runner w/o a cluster framework key", e);
+    }
+    
+    try {
+      assertTrue("client is not a LocalJobRunner",
+          cluster.getClient() instanceof LocalJobRunner);
+    } finally {
+      if (cluster != null) {
+        cluster.close();
+      }
+    }
+    
+    try {
+      conf = new Configuration();
+      conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+      cluster = new Cluster(conf);
+      ClientProtocol client = cluster.getClient();
+      assertTrue("client is a YARNRunner", client instanceof YARNRunner);
+    } catch (IOException e) {
+
+    } finally {
+      if (cluster != null) {
+        cluster.close();
+      }
+    }
+  }
+
+ 
+  @Test
+  public void testClusterGetDelegationToken() throws Exception {
+
+    Configuration conf = new Configuration(false);
+    Cluster cluster = null;
+    try {
+      conf = new Configuration();
+      conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+      cluster = new Cluster(conf);
+      YARNRunner yrunner = (YARNRunner) cluster.getClient();
+      GetDelegationTokenResponse getDTResponse = 
+          recordFactory.newRecordInstance(GetDelegationTokenResponse.class);
+      DelegationToken rmDTToken = recordFactory.newRecordInstance(
+          DelegationToken.class);
+      rmDTToken.setIdentifier(ByteBuffer.wrap(new byte[2]));
+      rmDTToken.setKind("Testclusterkind");
+      rmDTToken.setPassword(ByteBuffer.wrap("testcluster".getBytes()));
+      rmDTToken.setService("0.0.0.0:8032");
+      getDTResponse.setRMDelegationToken(rmDTToken);
+      final ClientRMProtocol cRMProtocol = mock(ClientRMProtocol.class);
+      when(cRMProtocol.getDelegationToken(any(
+          GetDelegationTokenRequest.class))).thenReturn(getDTResponse);
+      ResourceMgrDelegate rmgrDelegate = new ResourceMgrDelegate(
+          new YarnConfiguration(conf)) {
+        @Override
+        public synchronized void start() {
+          this.rmClient = cRMProtocol;
+        }
+      };
+      yrunner.setResourceMgrDelegate(rmgrDelegate);
+      Token t = cluster.getDelegationToken(new Text(" "));
+      assertTrue("Token kind is instead " + t.getKind().toString(),
+        "Testclusterkind".equals(t.getKind().toString()));
+    } finally {
+      if (cluster != null) {
+        cluster.close();
+      }
+    }
+  }
+
+}

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java Fri Oct 19 02:25:55 2012
@@ -23,6 +23,7 @@ import java.net.URI;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.zip.GZIPOutputStream;
+import java.util.concurrent.TimeoutException;
 
 import junit.framework.TestCase;
 
@@ -125,9 +126,9 @@ public class TestCombineFileInputFormat 
       BlockLocation[] locs =
         super.getFileBlockLocations(stat, start, len);
       if (name.equals(fileWithMissingBlocks)) {
-        System.out.println("Returing missing blocks for " + fileWithMissingBlocks);
-        locs[0] = new BlockLocation(new String[0], new String[0],
-            locs[0].getOffset(), locs[0].getLength());
+        System.out.println("Returning missing blocks for " + fileWithMissingBlocks);
+        locs[0] = new HdfsBlockLocation(new BlockLocation(new String[0],
+            new String[0], locs[0].getOffset(), locs[0].getLength()), null);
       }
       return locs;
     }
@@ -278,7 +279,7 @@ public class TestCombineFileInputFormat 
     assertFalse(rr.nextKeyValue());
   }
 
-  public void testSplitPlacement() throws IOException {
+  public void testSplitPlacement() throws Exception {
     MiniDFSCluster dfs = null;
     FileSystem fileSys = null;
     try {
@@ -678,7 +679,8 @@ public class TestCombineFileInputFormat 
   }
 
   static void writeFile(Configuration conf, Path name,
-      short replication, int numBlocks) throws IOException {
+                        short replication, int numBlocks)
+      throws IOException, TimeoutException, InterruptedException {
     FileSystem fileSys = FileSystem.get(conf);
 
     FSDataOutputStream stm = fileSys.create(name, true,
@@ -689,7 +691,8 @@ public class TestCombineFileInputFormat 
 
   // Creates the gzip file and return the FileStatus
   static FileStatus writeGzipFile(Configuration conf, Path name,
-      short replication, int numBlocks) throws IOException {
+      short replication, int numBlocks)
+      throws IOException, TimeoutException, InterruptedException {
     FileSystem fileSys = FileSystem.get(conf);
 
     GZIPOutputStream out = new GZIPOutputStream(fileSys.create(name, true, conf
@@ -699,7 +702,8 @@ public class TestCombineFileInputFormat 
   }
 
   private static void writeDataAndSetReplication(FileSystem fileSys, Path name,
-      OutputStream out, short replication, int numBlocks) throws IOException {
+        OutputStream out, short replication, int numBlocks)
+      throws IOException, TimeoutException, InterruptedException {
     for (int i = 0; i < numBlocks; i++) {
       out.write(databuf);
     }
@@ -707,7 +711,7 @@ public class TestCombineFileInputFormat 
     DFSTestUtil.waitReplication(fileSys, name, replication);
   }
   
-  public void testSplitPlacementForCompressedFiles() throws IOException {
+  public void testSplitPlacementForCompressedFiles() throws Exception {
     MiniDFSCluster dfs = null;
     FileSystem fileSys = null;
     try {
@@ -1058,7 +1062,7 @@ public class TestCombineFileInputFormat 
   /**
    * Test that CFIF can handle missing blocks.
    */
-  public void testMissingBlocks() throws IOException {
+  public void testMissingBlocks() throws Exception {
     String namenode = null;
     MiniDFSCluster dfs = null;
     FileSystem fileSys = null;



Mime
View raw message