hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject svn commit: r1390161 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/
Date Tue, 25 Sep 2012 21:36:31 GMT
Author: shv
Date: Tue Sep 25 21:36:30 2012
New Revision: 1390161

URL: http://svn.apache.org/viewvc?rev=1390161&view=rev
Log:
MAPREDUCE-4651. Benchmarking random reads with DFSIO. Contributed by Konstantin Shvachko.

Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/IOMapperBase.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1390161&r1=1390160&r2=1390161&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Tue Sep 25 21:36:30
2012
@@ -49,9 +49,6 @@ Release 2.0.2-alpha - 2012-09-07 
     MAPREDUCE-4408. allow jobs to set a JAR that is in the distributed cached 
     (rkanter via tucu)
 
-    MAPREDUCE-2786. Add compression option for TestDFSIO.
-    (Plamen Jeliazkov via shv)
-
     MAPREDUCE-4440. Changed SchedulerApp and SchedulerNode to be a minimal
     interface to allow schedulers to maintain their own. (acmurthy) 
 
@@ -409,9 +406,14 @@ Release 0.23.4 - UNRELEASED
 
   IMPROVEMENTS
 
+    MAPREDUCE-2786. Add compression option for TestDFSIO.
+    (Plamen Jeliazkov via shv)
+
     MAPREDUCE-4645. Provide a random seed to Slive to make the sequence
     of file names deterministic. (Ravi Prakash via shv)
 
+    MAPREDUCE-4651. Benchmarking random reads with DFSIO. (shv)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/IOMapperBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/IOMapperBase.java?rev=1390161&r1=1390160&r2=1390161&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/IOMapperBase.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/IOMapperBase.java
Tue Sep 25 21:36:30 2012
@@ -17,14 +17,13 @@
  */
 package org.apache.hadoop.fs;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetAddress;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * Base mapper class for IO operations.
@@ -35,7 +34,6 @@ import org.apache.hadoop.util.Reflection
  * statistics data to be collected by subsequent reducers.
  * 
  */
-@SuppressWarnings("deprecation")
 public abstract class IOMapperBase<T> extends Configured
     implements Mapper<Text, LongWritable, Text, Text> {
   
@@ -43,7 +41,7 @@ public abstract class IOMapperBase<T> ex
   protected int bufferSize;
   protected FileSystem fs;
   protected String hostName;
-  protected CompressionCodec compressionCodec;
+  protected Closeable stream;
 
   public IOMapperBase() { 
   }
@@ -62,22 +60,6 @@ public abstract class IOMapperBase<T> ex
     } catch(Exception e) {
       hostName = "localhost";
     }
-    
-    //grab compression
-    String compression = getConf().get("test.io.compression.class", null);
-    Class<? extends CompressionCodec> codec;
-
-    //try to initialize codec
-    try {
-      codec = (compression == null) ? null : 
-     Class.forName(compression).asSubclass(CompressionCodec.class);
-    } catch(Exception e) {
-      throw new RuntimeException("Compression codec not found: ", e);
-    }
-
-    if(codec != null) {
-      compressionCodec = (CompressionCodec) ReflectionUtils.newInstance(codec, getConf());
-    }
   }
 
   public void close() throws IOException {
@@ -98,6 +80,18 @@ public abstract class IOMapperBase<T> ex
                        long value) throws IOException;
 
   /**
+   * Create an input or output stream based on the specified file.
+   * Subclasses should override this method to provide an actual stream.
+   * 
+   * @param name file name
+   * @return the stream
+   * @throws IOException
+   */
+  public Closeable getIOStream(String name) throws IOException {
+    return null;
+  }
+
+  /**
    * Collect stat data to be combined by a subsequent reducer.
    * 
    * @param output
@@ -132,9 +126,15 @@ public abstract class IOMapperBase<T> ex
     long longValue = value.get();
     
     reporter.setStatus("starting " + name + " ::host = " + hostName);
-    
+
+    this.stream = getIOStream(name);
+    T statValue = null;
     long tStart = System.currentTimeMillis();
-    T statValue = doIO(reporter, name, longValue);
+    try {
+      statValue = doIO(reporter, name, longValue);
+    } finally {
+      if(stream != null) stream.close();
+    }
     long tEnd = System.currentTimeMillis();
     long execTime = tEnd - tStart;
     collectStats(output, name, execTime, statValue);

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java?rev=1390161&r1=1390160&r2=1390161&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java
Tue Sep 25 21:36:30 2012
@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs;
 
 import java.io.BufferedReader;
+import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -28,10 +29,9 @@ import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.PrintStream;
 import java.util.Date;
+import java.util.Random;
 import java.util.StringTokenizer;
 
-import junit.framework.TestCase;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -39,18 +39,30 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
 
 /**
  * Distributed i/o benchmark.
  * <p>
  * This test writes into or reads from a specified number of files.
- * File size is specified as a parameter to the test. 
+ * Number of bytes to write or read is specified as a parameter to the test. 
  * Each file is accessed in a separate map task.
  * <p>
  * The reducer collects the following statistics:
@@ -73,24 +85,24 @@ import org.apache.hadoop.util.ToolRunner
  * <li>standard deviation of i/o rate </li>
  * </ul>
  */
-public class TestDFSIO extends TestCase implements Tool {
+public class TestDFSIO implements Tool {
   // Constants
   private static final Log LOG = LogFactory.getLog(TestDFSIO.class);
-  private static final int TEST_TYPE_READ = 0;
-  private static final int TEST_TYPE_WRITE = 1;
-  private static final int TEST_TYPE_CLEANUP = 2;
-  private static final int TEST_TYPE_APPEND = 3;
   private static final int DEFAULT_BUFFER_SIZE = 1000000;
   private static final String BASE_FILE_NAME = "test_io_";
   private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log";
   private static final long MEGA = ByteMultiple.MB.value();
+  private static final int DEFAULT_NR_BYTES = 1;
+  private static final int DEFAULT_NR_FILES = 4;
   private static final String USAGE =
-                            "Usage: " + TestDFSIO.class.getSimpleName() +
-                            " [genericOptions]" +
-                            " -read | -write | -append | -clean [-nrFiles N]" +
-                            " [-fileSize Size[B|KB|MB|GB|TB]]" +
-                            " [-resFile resultFileName] [-bufferSize Bytes]" +
-                            " [-rootDir]";
+                    "Usage: " + TestDFSIO.class.getSimpleName() +
+                    " [genericOptions]" +
+                    " -read [-random | -backward | -skip [-skipSize Size]] |" +
+                    " -write | -append | -clean" +
+                    " [-nrFiles N]" +
+                    " [-size Size[B|KB|MB|GB|TB]]" +
+                    " [-resFile resultFileName] [-bufferSize Bytes]" +
+                    " [-rootDir]";
 
   private Configuration config;
 
@@ -101,6 +113,27 @@ public class TestDFSIO extends TestCase 
     Configuration.addDefaultResource("mapred-site.xml");
   }
 
+  private static enum TestType {
+    TEST_TYPE_READ("read"),
+    TEST_TYPE_WRITE("write"),
+    TEST_TYPE_CLEANUP("cleanup"),
+    TEST_TYPE_APPEND("append"),
+    TEST_TYPE_READ_RANDOM("random read"),
+    TEST_TYPE_READ_BACKWARD("backward read"),
+    TEST_TYPE_READ_SKIP("skip read");
+
+    private String type;
+
+    private TestType(String t) {
+      type = t;
+    }
+
+    @Override // String
+    public String toString() {
+      return type;
+    }
+  }
+
   static enum ByteMultiple {
     B(1L),
     KB(0x400L),
@@ -155,62 +188,100 @@ public class TestDFSIO extends TestCase 
   private static Path getAppendDir(Configuration conf) {
     return new Path(getBaseDir(conf), "io_append");
   }
+  private static Path getRandomReadDir(Configuration conf) {
+    return new Path(getBaseDir(conf), "io_random_read");
+  }
   private static Path getDataDir(Configuration conf) {
     return new Path(getBaseDir(conf), "io_data");
   }
 
-  /**
-   * Run the test with default parameters.
-   * 
-   * @throws Exception
-   */
-  public void testIOs() throws Exception {
-    TestDFSIO bench = new TestDFSIO();
-    bench.testIOs(1, 4);
+  private static MiniDFSCluster cluster;
+  private static TestDFSIO bench;
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    bench = new TestDFSIO();
+    bench.getConf().setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
+    cluster = new MiniDFSCluster.Builder(bench.getConf())
+                                .numDataNodes(2)
+                                .format(true)
+                                .build();
+    FileSystem fs = cluster.getFileSystem();
+    bench.createControlFile(fs, DEFAULT_NR_BYTES, DEFAULT_NR_FILES);
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    if(cluster == null)
+      return;
+    FileSystem fs = cluster.getFileSystem();
+    bench.cleanup(fs);
+    cluster.shutdown();
   }
 
-  /**
-   * Run the test with the specified parameters.
-   * 
-   * @param fileSize file size
-   * @param nrFiles number of files
-   * @throws IOException
-   */
-  public void testIOs(int fileSize, int nrFiles)
-    throws IOException {
-    config.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
-    MiniDFSCluster cluster = null;
-    try {
-      cluster = new MiniDFSCluster(config, 2, true, null);
-      FileSystem fs = cluster.getFileSystem();
+  @Test
+  public void testWrite() throws Exception {
+    FileSystem fs = cluster.getFileSystem();
+    long tStart = System.currentTimeMillis();
+    bench.writeTest(fs);
+    long execTime = System.currentTimeMillis() - tStart;
+    bench.analyzeResult(fs, TestType.TEST_TYPE_WRITE, execTime);
+  }
 
-      createControlFile(fs, fileSize, nrFiles);
-      long tStart = System.currentTimeMillis();
-      writeTest(fs);
-      long execTime = System.currentTimeMillis() - tStart;
-      analyzeResult(fs, TEST_TYPE_WRITE, execTime, DEFAULT_RES_FILE_NAME);
+  @Test
+  public void testRead() throws Exception {
+    FileSystem fs = cluster.getFileSystem();
+    long tStart = System.currentTimeMillis();
+    bench.readTest(fs);
+    long execTime = System.currentTimeMillis() - tStart;
+    bench.analyzeResult(fs, TestType.TEST_TYPE_READ, execTime);
+  }
 
-      tStart = System.currentTimeMillis();
-      readTest(fs);
-      execTime = System.currentTimeMillis() - tStart;
-      analyzeResult(fs, TEST_TYPE_READ, execTime, DEFAULT_RES_FILE_NAME);
+  @Test
+  public void testReadRandom() throws Exception {
+    FileSystem fs = cluster.getFileSystem();
+    long tStart = System.currentTimeMillis();
+    bench.getConf().setLong("test.io.skip.size", 0);
+    bench.randomReadTest(fs);
+    long execTime = System.currentTimeMillis() - tStart;
+    bench.analyzeResult(fs, TestType.TEST_TYPE_READ_RANDOM, execTime);
+  }
 
-      tStart = System.currentTimeMillis();
-      appendTest(fs);
-      execTime = System.currentTimeMillis() - tStart;
-      analyzeResult(fs, TEST_TYPE_APPEND, execTime, DEFAULT_RES_FILE_NAME);
+  @Test
+  public void testReadBackward() throws Exception {
+    FileSystem fs = cluster.getFileSystem();
+    long tStart = System.currentTimeMillis();
+    bench.getConf().setLong("test.io.skip.size", -DEFAULT_BUFFER_SIZE);
+    bench.randomReadTest(fs);
+    long execTime = System.currentTimeMillis() - tStart;
+    bench.analyzeResult(fs, TestType.TEST_TYPE_READ_BACKWARD, execTime);
+  }
 
-      cleanup(fs);
-    } finally {
-      if(cluster != null) cluster.shutdown();
-    }
+  @Test
+  public void testReadSkip() throws Exception {
+    FileSystem fs = cluster.getFileSystem();
+    long tStart = System.currentTimeMillis();
+    bench.getConf().setLong("test.io.skip.size", 1);
+    bench.randomReadTest(fs);
+    long execTime = System.currentTimeMillis() - tStart;
+    bench.analyzeResult(fs, TestType.TEST_TYPE_READ_SKIP, execTime);
   }
 
+  @Test
+  public void testAppend() throws Exception {
+    FileSystem fs = cluster.getFileSystem();
+    long tStart = System.currentTimeMillis();
+    bench.appendTest(fs);
+    long execTime = System.currentTimeMillis() - tStart;
+    bench.analyzeResult(fs, TestType.TEST_TYPE_APPEND, execTime);
+  }
+
+  @SuppressWarnings("deprecation")
   private void createControlFile(FileSystem fs,
-                                  long fileSize, // in bytes
+                                  long nrBytes, // in bytes
                                   int nrFiles
                                 ) throws IOException {
-    LOG.info("creating control file: "+fileSize+" bytes, "+nrFiles+" files");
+    LOG.info("creating control file: "+nrBytes+" bytes, "+nrFiles+" files");
 
     Path controlDir = getControlDir(config);
     fs.delete(controlDir, true);
@@ -223,7 +294,7 @@ public class TestDFSIO extends TestCase 
         writer = SequenceFile.createWriter(fs, config, controlFile,
                                            Text.class, LongWritable.class,
                                            CompressionType.NONE);
-        writer.append(new Text(name), new LongWritable(fileSize));
+        writer.append(new Text(name), new LongWritable(nrBytes));
       } catch(Exception e) {
         throw new IOException(e.getLocalizedMessage());
       } finally {
@@ -251,10 +322,35 @@ public class TestDFSIO extends TestCase 
    * <li>i/o rate squared</li>
    * </ul>
    */
-  private abstract static class IOStatMapper<T> extends IOMapperBase<T> {
-    IOStatMapper() { 
+  private abstract static class IOStatMapper extends IOMapperBase<Long> {
+    protected CompressionCodec compressionCodec;
+
+    IOStatMapper() {
     }
-    
+
+    @Override // Mapper
+    public void configure(JobConf conf) {
+      super.configure(conf);
+
+      // grab compression
+      String compression = getConf().get("test.io.compression.class", null);
+      Class<? extends CompressionCodec> codec;
+
+      // try to initialize codec
+      try {
+        codec = (compression == null) ? null : 
+          Class.forName(compression).asSubclass(CompressionCodec.class);
+      } catch(Exception e) {
+        throw new RuntimeException("Compression codec not found: ", e);
+      }
+
+      if(codec != null) {
+        compressionCodec = (CompressionCodec)
+            ReflectionUtils.newInstance(codec, getConf());
+      }
+    }
+
+    @Override // IOMapperBase
     void collectStats(OutputCollector<Text, Text> output, 
                       String name,
                       long execTime, 
@@ -281,36 +377,38 @@ public class TestDFSIO extends TestCase 
   /**
    * Write mapper class.
    */
-  public static class WriteMapper extends IOStatMapper<Long> {
+  public static class WriteMapper extends IOStatMapper {
 
     public WriteMapper() { 
       for(int i=0; i < bufferSize; i++)
         buffer[i] = (byte)('0' + i % 50);
     }
 
-    @Override
+    @Override // IOMapperBase
+    public Closeable getIOStream(String name) throws IOException {
+      // create file
+      OutputStream out =
+          fs.create(new Path(getDataDir(getConf()), name), true, bufferSize);
+      if(compressionCodec != null)
+        out = compressionCodec.createOutputStream(out);
+      LOG.info("out = " + out.getClass().getName());
+      return out;
+    }
+
+    @Override // IOMapperBase
     public Long doIO(Reporter reporter, 
                        String name, 
                        long totalSize // in bytes
                      ) throws IOException {
-      // create file
-      OutputStream out;
-      out = fs.create(new Path(getDataDir(getConf()), name), true, bufferSize);
-    
-      if(compressionCodec != null) out = compressionCodec.createOutputStream(out);
-      
-      try {
-        // write to the file
-        long nrRemaining;
-        for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
-          int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining; 
-          out.write(buffer, 0, curSize);
-          reporter.setStatus("writing " + name + "@" + 
-                             (totalSize - nrRemaining) + "/" + totalSize 
-                             + " ::host = " + hostName);
-        }
-      } finally {
-        out.close();
+      OutputStream out = (OutputStream)this.stream;
+      // write to the file
+      long nrRemaining;
+      for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
+        int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining;
+        out.write(buffer, 0, curSize);
+        reporter.setStatus("writing " + name + "@" + 
+                           (totalSize - nrRemaining) + "/" + totalSize 
+                           + " ::host = " + hostName);
       }
       return Long.valueOf(totalSize);
     }
@@ -324,7 +422,6 @@ public class TestDFSIO extends TestCase 
     runIOTest(WriteMapper.class, writeDir);
   }
   
-  @SuppressWarnings("deprecation")
   private void runIOTest(
           Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass,

           Path outputDir) throws IOException {
@@ -346,35 +443,38 @@ public class TestDFSIO extends TestCase 
   /**
    * Append mapper class.
    */
-  public static class AppendMapper extends IOStatMapper<Long> {
+  public static class AppendMapper extends IOStatMapper {
 
     public AppendMapper() { 
       for(int i=0; i < bufferSize; i++)
         buffer[i] = (byte)('0' + i % 50);
     }
 
+    @Override // IOMapperBase
+    public Closeable getIOStream(String name) throws IOException {
+      // open file for append
+      OutputStream out =
+          fs.append(new Path(getDataDir(getConf()), name), bufferSize);
+      if(compressionCodec != null)
+        out = compressionCodec.createOutputStream(out);
+      LOG.info("out = " + out.getClass().getName());
+      return out;
+    }
+
+    @Override // IOMapperBase
     public Long doIO(Reporter reporter, 
                        String name, 
                        long totalSize // in bytes
                      ) throws IOException {
-      // create file
-      OutputStream out;
-      out = fs.append(new Path(getDataDir(getConf()), name), bufferSize);
-      
-      if(compressionCodec != null) out = compressionCodec.createOutputStream(out);
-      
-      try {
-        // write to the file
-        long nrRemaining;
-        for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
-          int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining; 
-          out.write(buffer, 0, curSize);
-          reporter.setStatus("writing " + name + "@" + 
-                             (totalSize - nrRemaining) + "/" + totalSize 
-                             + " ::host = " + hostName);
-        }
-      } finally {
-        out.close();
+      OutputStream out = (OutputStream)this.stream;
+      // write to the file
+      long nrRemaining;
+      for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
+        int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining;
+        out.write(buffer, 0, curSize);
+        reporter.setStatus("writing " + name + "@" + 
+                           (totalSize - nrRemaining) + "/" + totalSize 
+                           + " ::host = " + hostName);
       }
       return Long.valueOf(totalSize);
     }
@@ -389,32 +489,35 @@ public class TestDFSIO extends TestCase 
   /**
    * Read mapper class.
    */
-  public static class ReadMapper extends IOStatMapper<Long> {
+  public static class ReadMapper extends IOStatMapper {
 
     public ReadMapper() { 
     }
 
+    @Override // IOMapperBase
+    public Closeable getIOStream(String name) throws IOException {
+      // open file
+      InputStream in = fs.open(new Path(getDataDir(getConf()), name));
+      if(compressionCodec != null)
+        in = compressionCodec.createInputStream(in);
+      LOG.info("in = " + in.getClass().getName());
+      return in;
+    }
+
+    @Override // IOMapperBase
     public Long doIO(Reporter reporter, 
                        String name, 
                        long totalSize // in bytes
                      ) throws IOException {
-      // open file
-      InputStream in = fs.open(new Path(getDataDir(getConf()), name));
-      
-      if(compressionCodec != null) in = compressionCodec.createInputStream(in);
-      
+      InputStream in = (InputStream)this.stream;
       long actualSize = 0;
-      try {
-        while (actualSize < totalSize) {
-          int curSize = in.read(buffer, 0, bufferSize);
-          if(curSize < 0) break;
-          actualSize += curSize;
-          reporter.setStatus("reading " + name + "@" + 
-                             actualSize + "/" + totalSize 
-                             + " ::host = " + hostName);
-        }
-      } finally {
-        in.close();
+      while (actualSize < totalSize) {
+        int curSize = in.read(buffer, 0, bufferSize);
+        if(curSize < 0) break;
+        actualSize += curSize;
+        reporter.setStatus("reading " + name + "@" + 
+                           actualSize + "/" + totalSize 
+                           + " ::host = " + hostName);
       }
       return Long.valueOf(actualSize);
     }
@@ -426,20 +529,111 @@ public class TestDFSIO extends TestCase 
     runIOTest(ReadMapper.class, readDir);
   }
 
+  /**
+   * Mapper class for random reads.
+   * The mapper chooses a position in the file and reads bufferSize
+   * bytes starting at the chosen position.
+   * It stops after reading the totalSize bytes, specified by -size.
+   * 
+   * There are three type of reads.
+   * 1) Random read always chooses a random position to read from: skipSize = 0
+   * 2) Backward read reads file in reverse order                : skipSize < 0
+   * 3) Skip-read skips skipSize bytes after every read          : skipSize > 0
+   */
+  public static class RandomReadMapper extends IOStatMapper {
+    private Random rnd;
+    private long fileSize;
+    private long skipSize;
+
+    @Override // Mapper
+    public void configure(JobConf conf) {
+      super.configure(conf);
+      skipSize = conf.getLong("test.io.skip.size", 0);
+    }
+
+    public RandomReadMapper() { 
+      rnd = new Random();
+    }
+
+    @Override // IOMapperBase
+    public Closeable getIOStream(String name) throws IOException {
+      Path filePath = new Path(getDataDir(getConf()), name);
+      this.fileSize = fs.getFileStatus(filePath).getLen();
+      InputStream in = fs.open(filePath);
+      if(compressionCodec != null)
+        in = new FSDataInputStream(compressionCodec.createInputStream(in));
+      LOG.info("in = " + in.getClass().getName());
+      LOG.info("skipSize = " + skipSize);
+      return in;
+    }
+
+    @Override // IOMapperBase
+    public Long doIO(Reporter reporter, 
+                       String name, 
+                       long totalSize // in bytes
+                     ) throws IOException {
+      PositionedReadable in = (PositionedReadable)this.stream;
+      long actualSize = 0;
+      for(long pos = nextOffset(-1);
+          actualSize < totalSize; pos = nextOffset(pos)) {
+        int curSize = in.read(pos, buffer, 0, bufferSize);
+        if(curSize < 0) break;
+        actualSize += curSize;
+        reporter.setStatus("reading " + name + "@" + 
+                           actualSize + "/" + totalSize 
+                           + " ::host = " + hostName);
+      }
+      return Long.valueOf(actualSize);
+    }
+
+    /**
+     * Get next offset for reading.
+     * If current < 0 then choose initial offset according to the read type.
+     * 
+     * @param current offset
+     * @return
+     */
+    private long nextOffset(long current) {
+      if(skipSize == 0)
+        return rnd.nextInt((int)(fileSize));
+      if(skipSize > 0)
+        return (current < 0) ? 0 : (current + bufferSize + skipSize);
+      // skipSize < 0
+      return (current < 0) ? Math.max(0, fileSize - bufferSize) :
+                             Math.max(0, current + skipSize);
+    }
+  }
+
+  private void randomReadTest(FileSystem fs) throws IOException {
+    Path readDir = getRandomReadDir(config);
+    fs.delete(readDir, true);
+    runIOTest(RandomReadMapper.class, readDir);
+  }
+
   private void sequentialTest(FileSystem fs, 
-                              int testType, 
+                              TestType testType, 
                               long fileSize, // in bytes
                               int nrFiles
                              ) throws IOException {
-    IOStatMapper<Long> ioer = null;
-    if (testType == TEST_TYPE_READ)
+    IOStatMapper ioer = null;
+    switch(testType) {
+    case TEST_TYPE_READ:
       ioer = new ReadMapper();
-    else if (testType == TEST_TYPE_WRITE)
+      break;
+    case TEST_TYPE_WRITE:
       ioer = new WriteMapper();
-    else if (testType == TEST_TYPE_APPEND)
+      break;
+    case TEST_TYPE_APPEND:
       ioer = new AppendMapper();
-    else
+      break;
+    case TEST_TYPE_READ_RANDOM:
+    case TEST_TYPE_READ_BACKWARD:
+    case TEST_TYPE_READ_SKIP:
+      ioer = new RandomReadMapper();
+      break;
+    default:
       return;
+    }
     for(int i=0; i < nrFiles; i++)
       ioer.doIO(Reporter.NULL,
                 BASE_FILE_NAME+Integer.toString(i), 
@@ -462,14 +656,15 @@ public class TestDFSIO extends TestCase 
 
   @Override // Tool
   public int run(String[] args) throws IOException {
-    int testType = TEST_TYPE_READ;
+    TestType testType = null;
     int bufferSize = DEFAULT_BUFFER_SIZE;
-    long fileSize = 1*MEGA;
+    long nrBytes = 1*MEGA;
     int nrFiles = 1;
+    long skipSize = 0;
     String resFileName = DEFAULT_RES_FILE_NAME;
     String compressionClass = null;
     boolean isSequential = false;
-    String version = TestDFSIO.class.getSimpleName() + ".0.0.6";
+    String version = TestDFSIO.class.getSimpleName() + ".1.7";
 
     LOG.info(version);
     if (args.length == 0) {
@@ -479,21 +674,32 @@ public class TestDFSIO extends TestCase 
 
     for (int i = 0; i < args.length; i++) {       // parse command line
       if (args[i].startsWith("-read")) {
-        testType = TEST_TYPE_READ;
+        testType = TestType.TEST_TYPE_READ;
       } else if (args[i].equals("-write")) {
-        testType = TEST_TYPE_WRITE;
+        testType = TestType.TEST_TYPE_WRITE;
       } else if (args[i].equals("-append")) {
-        testType = TEST_TYPE_APPEND;
+        testType = TestType.TEST_TYPE_APPEND;
+      } else if (args[i].equals("-random")) {
+        if(testType != TestType.TEST_TYPE_READ) return -1;
+        testType = TestType.TEST_TYPE_READ_RANDOM;
+      } else if (args[i].equals("-backward")) {
+        if(testType != TestType.TEST_TYPE_READ) return -1;
+        testType = TestType.TEST_TYPE_READ_BACKWARD;
+      } else if (args[i].equals("-skip")) {
+        if(testType != TestType.TEST_TYPE_READ) return -1;
+        testType = TestType.TEST_TYPE_READ_SKIP;
       } else if (args[i].equals("-clean")) {
-        testType = TEST_TYPE_CLEANUP;
+        testType = TestType.TEST_TYPE_CLEANUP;
       } else if (args[i].startsWith("-seq")) {
         isSequential = true;
       } else if (args[i].startsWith("-compression")) {
         compressionClass = args[++i];
       } else if (args[i].equals("-nrFiles")) {
         nrFiles = Integer.parseInt(args[++i]);
-      } else if (args[i].equals("-fileSize")) {
-        fileSize = parseSize(args[++i]);
+      } else if (args[i].equals("-fileSize") || args[i].equals("-size")) {
+        nrBytes = parseSize(args[++i]);
+      } else if (args[i].equals("-skipSize")) {
+        skipSize = parseSize(args[++i]);
       } else if (args[i].equals("-bufferSize")) {
         bufferSize = Integer.parseInt(args[++i]);
       } else if (args[i].equals("-resFile")) {
@@ -503,10 +709,18 @@ public class TestDFSIO extends TestCase 
         return -1;
       }
     }
+    if(testType == null)
+      return -1;
+    if(testType == TestType.TEST_TYPE_READ_BACKWARD)
+      skipSize = -bufferSize;
+    else if(testType == TestType.TEST_TYPE_READ_SKIP && skipSize == 0)
+      skipSize = bufferSize;
 
     LOG.info("nrFiles = " + nrFiles);
-    LOG.info("fileSize (MB) = " + toMB(fileSize));
+    LOG.info("nrBytes (MB) = " + toMB(nrBytes));
     LOG.info("bufferSize = " + bufferSize);
+    if(skipSize > 0)
+      LOG.info("skipSize = " + skipSize);
     LOG.info("baseDir = " + getBaseDir(config));
     
     if(compressionClass != null) {
@@ -515,29 +729,39 @@ public class TestDFSIO extends TestCase 
     }
 
     config.setInt("test.io.file.buffer.size", bufferSize);
+    config.setLong("test.io.skip.size", skipSize);
     config.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
     FileSystem fs = FileSystem.get(config);
 
     if (isSequential) {
       long tStart = System.currentTimeMillis();
-      sequentialTest(fs, testType, fileSize, nrFiles);
+      sequentialTest(fs, testType, nrBytes, nrFiles);
       long execTime = System.currentTimeMillis() - tStart;
       String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000;
       LOG.info(resultLine);
       return 0;
     }
-    if (testType == TEST_TYPE_CLEANUP) {
+    if (testType == TestType.TEST_TYPE_CLEANUP) {
       cleanup(fs);
       return 0;
     }
-    createControlFile(fs, fileSize, nrFiles);
+    createControlFile(fs, nrBytes, nrFiles);
     long tStart = System.currentTimeMillis();
-    if (testType == TEST_TYPE_WRITE)
+    switch(testType) {
+    case TEST_TYPE_WRITE:
       writeTest(fs);
-    if (testType == TEST_TYPE_READ)
+      break;
+    case TEST_TYPE_READ:
       readTest(fs);
-    if (testType == TEST_TYPE_APPEND)
+      break;
+    case TEST_TYPE_APPEND:
       appendTest(fs);
+      break;
+    case TEST_TYPE_READ_RANDOM:
+    case TEST_TYPE_READ_BACKWARD:
+    case TEST_TYPE_READ_SKIP:
+      randomReadTest(fs);
+    }
     long execTime = System.currentTimeMillis() - tStart;
   
     analyzeResult(fs, testType, execTime, resFileName);
@@ -563,9 +787,9 @@ public class TestDFSIO extends TestCase 
   static long parseSize(String arg) {
     String[] args = arg.split("\\D", 2);  // get digits
     assert args.length <= 2;
-    long fileSize = Long.parseLong(args[0]);
+    long nrBytes = Long.parseLong(args[0]);
     String bytesMult = arg.substring(args[0].length()); // get byte multiple
-    return fileSize * ByteMultiple.parseString(bytesMult).value();
+    return nrBytes * ByteMultiple.parseString(bytesMult).value();
   }
 
   static float toMB(long bytes) {
@@ -573,17 +797,11 @@ public class TestDFSIO extends TestCase 
   }
 
   private void analyzeResult(	FileSystem fs,
-                              int testType,
+                              TestType testType,
                               long execTime,
                               String resFileName
                             ) throws IOException {
-    Path reduceFile;
-    if (testType == TEST_TYPE_WRITE)
-      reduceFile = new Path(getWriteDir(config), "part-00000");
-    else if (testType == TEST_TYPE_APPEND)
-      reduceFile = new Path(getAppendDir(config), "part-00000");
-    else // if (testType == TEST_TYPE_READ)
-      reduceFile = new Path(getReadDir(config), "part-00000");
+    Path reduceFile = getReduceFilePath(testType);
     long tasks = 0;
     long size = 0;
     long time = 0;
@@ -617,10 +835,7 @@ public class TestDFSIO extends TestCase 
     double med = rate / 1000 / tasks;
     double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med));
     String resultLines[] = {
-      "----- TestDFSIO ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" :
-                                    (testType == TEST_TYPE_READ) ? "read" : 
-                                    (testType == TEST_TYPE_APPEND) ? "append" : 
-                                    "unknown"),
+      "----- TestDFSIO ----- : " + testType,
       "           Date & time: " + new Date(System.currentTimeMillis()),
       "       Number of files: " + tasks,
       "Total MBytes processed: " + toMB(size),
@@ -642,6 +857,27 @@ public class TestDFSIO extends TestCase 
     }
   }
 
+  private Path getReduceFilePath(TestType testType) {
+    switch(testType) {
+    case TEST_TYPE_WRITE:
+      return new Path(getWriteDir(config), "part-00000");
+    case TEST_TYPE_APPEND:
+      return new Path(getAppendDir(config), "part-00000");
+    case TEST_TYPE_READ:
+      return new Path(getReadDir(config), "part-00000");
+    case TEST_TYPE_READ_RANDOM:
+    case TEST_TYPE_READ_BACKWARD:
+    case TEST_TYPE_READ_SKIP:
+      return new Path(getRandomReadDir(config), "part-00000");
+    }
+    return null;
+  }
+
+  private void analyzeResult(FileSystem fs, TestType testType, long execTime)
+      throws IOException {
+    analyzeResult(fs, testType, execTime, DEFAULT_RES_FILE_NAME);
+  }
+
   private void cleanup(FileSystem fs)
   throws IOException {
     LOG.info("Cleaning up test files");



Mime
View raw message